Monday, April 24, 2017

Apache Spark RDD and Java Streams

A few months ago, I was fortunate enough to participate in a few PoCs (proof-of-concepts) that used Apache Spark. There, I got the chance to use resilient distributed datasets (RDDs for short), transformations, and actions.

After a few days, I realized that while Apache Spark and the JDK are very different platforms, there are similarities between RDD transformations and actions, and stream intermediate and terminal operations. I think these similarities can help beginners (like me *grin*) get started with Apache Spark.

Java StreamApache Spark RDD
Intermediate operationTransformation
Terminal operationAction

Java Streams

Let's start with streams. Java 8 was released sometime in 2014. Arguably, the most significant feature it brought is the Streams API (or simply streams).

Once a Stream is created, it provides many operations that can be grouped in two categories:

  • intermediate,
  • and terminal.

Intermediate operations return a stream from the previous one. These intermediate operations can be connected together to form a pipeline. Terminal operations, on the other hand, closes the stream pipeline, and returns a result.

Here's an example.

Stream.of(1, 2, 3)
        .peek(n -> System.out.println("Peeked at: " + n))
        .map(n -> n*n)
        .forEach(System.out::println);

When the above example is run, it generates the following output:

Peeked at: 1
1
Peeked at: 2
4
Peeked at: 3
9

Intermediate operations are lazy. The actual execution does not start until the terminal operation is encountered. The terminal operation in this case is forEach(). That's why, we do not see the following.

Peeked at: 1
Peeked at: 2
Peeked at: 3
1
4
9

Instead, what we see is that the operations: peek(), map(), and forEach(), have been joined to form a pipeline. In each pass, the static of() operation returns one element from the specified values. Then the pipeline is invoked: peek() that prints the string "Peeked at: 1", followed by map(), and terminated by forEach() that prints the number "1". Then with another pass starting with of() that returns the next element from the specified values, followed by peek(), and map(), and so on.

Executing an intermediate operation such as peek() does not actually perform any peeking, but instead creates a new stream that, when traversed, contains the same elements of the initial stream, but additionally performing the provided action.

Apache Spark RDD

Now, let's turn to Spark's RDD (resilient distributed dataset). Spark's core abstraction for working with data is the resilient distributed dataset (RDD).

An RDD is simply a distributed collection of elements. In Spark all work is expressed as either creating new RDDs, or calling operations on RDDs to compute a result. Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

Once created, RDDs offer two types of operations:

  • transformations,
  • and actions.

Transformations construct a new RDD from a previous one. Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS).

Here's an example with a rough equivalent using Java Streams.

SparkConf conf = new SparkConf().setAppName(...);
JavaSparkContext sc = new JavaSparkContext(conf);

List<Integer> squares = sc.parallelize(Arrays.asList(1, 2, 3))
        .map(n -> n*n)
        .collect();

System.out.println(squares.toString());

// Rough equivalent using Java Streams
List<Integer> squares2 = Stream.of(1, 2, 3)
        .map(n -> n*n)
        .collect(Collectors.toList());

System.out.println(squares2.toString());

After setting up the Spark context, we call parallelize() which creates an RDD from the given list of elements. map() is a transformation, and collect() is an action. Transformations, like intermediate stream operations in Java, are lazily evaluated. In this example, Spark will not begin to execute the function provided in a call to map() until it sees an action. This approach might seem unusual at first, but it makes a lot of sense when dealing with huge amounts of data (big data, in other words). It allows Spark to split up the work and do them in parallel.

Word Count Example

Let's use word count as an example. Here, we have two implementations: one uses Apache Spark, and the other uses Java Streams.

Here's the Java Stream version.

public class WordCountJava {

 private static final String REGEX = "\\s+";
 
 public Map<String, Long> count(URI uri) throws IOException {
  return Files.lines(Paths.get(uri))
   .map(line -> line.split(REGEX))
   .flatMap(Arrays::stream)
   .map(word -> word.toLowerCase())
   .collect(groupingBy(
    identity(), TreeMap::new, counting()));
 }

}

Here, we read the source file line by line and transforming each line in a sequence of words (via the map() intermediate operation). Since we have a sequence of words for each line and we have many lines, we convert them to a single sequence of words using flatMap(). In the end, we group them by their identity() (i.e. the identity of a string is the string itself) and we count them.

When tested against a text file that contains the two lines:

The quick brown fox jumps over the lazy dog
The quick brown fox jumps over the lazy dog

It outputs the following map:

{brown=2, dog=2, fox=2, jumps=2, lazy=2, over=2, quick=2, the=4}

And now, here's the Spark version.

public class WordCountSpark {

 private static final String REGEX = "\\s+";
 
 public List<Tuple2<String, Long>> count(URI uri, JavaSparkContext sc) throws IOException {
  JavaRDD<String> input = sc.textFile(Paths.get(uri).toString());
  return input.flatMap(
     line -> Arrays.asList(line.split(REGEX)).iterator())
    .map(word -> word.toLowerCase())
    .mapToPair(word -> new Tuple2<String, Long>(word, 1L))
    .reduceByKey((x, y) -> (Long) x + (Long) y)
    .sortByKey()
    .collect();
 }

}

When run against the same two-line text file, it outputs the following:

[(brown,2), (dog,2), (fox,2), (jumps,2), (lazy,2), (over,2), (quick,2), (the,4)]

The initial configuration of a JavaSparkContext has been excluded for brevity. We create a JavaRDD from a text file. It's worth mentioning that this initial RDD will operate line-by-line from the text file. That's why we split each line into sequence of words and flatMap() them. Then we transform a word into a key-value tuple with a count of one (1) for incremental counting. Once we have done that, we group by words (reduceByKey()) our key-value tuples from the previous RDD and in the end we sort them in natural order.

In Closing

As shown, both implementations are similar. The Spark implementation requires more setup and configuration, and is more powerful. Learning about intermediate and terminal stream operations can help get a Java developer started with understanding Apache Spark.

Thanks to Krischelle, RB, and Juno, for letting me participate in the PoCs that used Apache Spark.