Hadoop/MapReduce has jump started a revolution in large scale data processing, which earler was either unfeasible or uneconomical. Now, it is possible to use the power of commodity hardware to load up data on disks on a cluster and process them in parallel. MapReduce makes it possible to take the computation to where the data is resident. In Hadoop, with HDFS, the data is on disks. A Hadoop job works on a HDFS input directory/files and outputs the data to HDFS files as well. While developing a Hadoop Cluster monitoring/management product for Splunk, I was faced with a way to simulate workloads on the cluster in our Dev/QA environment. In order to divide/conquer the issue. I decided to classify the work load as follows –

 

  1. Category I – Large Input Size/Small Output Size
  2. Category II – Large Input Size/Large Output Size

I also added a processing time equivalent to it, long and short durations. So I had to write four simple MR jobs to simulate this. But before that I was able to simulate Category I loads with a Hive query and Category II with Terasort. Also, outside of these categories, the cluster can be used just as storage and I used the stock TestDFSIO test program to simulate this case. We were running quite peacefully with out testing and simulation for the product. But one thing that had me concerned was the Hive queries. Even simple queries would take a long time (vis-a-vis MySQL or Oracle – when the dataset size was small). Hive runs a sequences of more than one jobs for one query. Each job outputs to HDFS and this becomes the input for the next job in the chain. This repeated data going back to disk and back and forth takes it toll on the performance. This lead me to the search of a framework that will help me to do some processing in-memory in a distributed manner – Spark is the answer.

I thought I will give Spark a try. I cloned from the master branch but ran into issues compiling because of a repository issue (which has since been resolved). I then downloaded spark-0.6.2. On a Ubuntu 12.04 box, I had to install typesafe-stack first and then install scala (instructions on installing the typesafe stack is here) . Installing using the scala deb package caused the libjline-java incompatibility issue . I set the SCALA_HOME to /home/ubuntu/.sbt/boot/scala-2.9.2 on my Amazon ubuntu instance.

I chose to run the JavaHdfsLR example that comes with Spark to see how it scales. I did not do a major benchmarking exercise but enough to convince myself that I got a good idea of Spark. I ran Spark in standalone mode – that is Spark speak for running its own master and worker program directly on the OS, instead of using something like Mesos or YARN. I made a simple python script to generate a million rows of data and used this with the example program. I decided to run a Spark Master on a Amazon Medium size Ubuntu VM and two small worker VMs (created from my own AMI). The example program completed in 32 seconds for two workers versus 39 seconds for a single work. Though this is not linear, I think the benefits will be better for larger amounts of data.

Now lets take a look at the example to see the power of Spark when it comes to iterative computing common in machine-learning computations.

 public static void main(String[] args) {

    if (args.length < 3) {
      System.err.println("Usage: JavaHdfsLR   ");
      System.exit(1);
    }

    JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR");
    JavaRDD lines = sc.textFile(args[1], 4);
    JavaRDD points = lines.map(new ParsePoint()).cache();
    int ITERATIONS = Integer.parseInt(args[2]);

    // Initialize w to a random value
    double[] w = new double[D];
    for (int i = 0; i < D; i++) {
      w[i] = 2 * rand.nextDouble() - 1;
    }

    System.out.print("Initial w: ");
    printWeights(w);

    for (int i = 1; i <= ITERATIONS; i++) {
      System.out.println("On iteration " + i);

      double[] gradient = points.map(
        new ComputeGradient(w)
      ).reduce(new VectorSum());

      for (int j = 0; j < D; j++) {
        w[j] -= gradient[j];
      }

    }

    System.out.print("Final w: ");
    printWeights(w);
    System.exit(0);
  }

Note, the data is loaded outside the for-loop and cache. This would not possible with pure MR programming. Now, if this were to be done inside the for-loop it would take about 1m34s (for a single worker). If you are using Hive, which translates a query into more than one job, each job would output to HDFS and the next job reads from HDFS, imagine, if each job output to an in-memory RDD and the other one read that as the input. Yes, that is possible with Shark, a Hive like implementation running on Spark.

I think the power of Big Data will come from mixing and matching HDFS with other frameworks like Spark, enabling machine-learning performance on scales of data which was not possible earlier. Remember, simple algorithms on large scale data is better than complex algorithms on small-set of data.