Assignment 2: Counting in Spark due 4:00 pm Oct. 9

In this assignment you will do two things:

  1. "Port" the MapReduce implementations of the bigram frequency count program from Bespin over to Spark (in Scala).
  2. "Port" the MapReduce implementations of assignment 1 over to Spark (in Scala).

Bigram Relative Frequency

Your starting points are ComputeBigramRelativeFrequencyPairs and ComputeBigramRelativeFrequencyStripes in package io.bespin.java.mapreduce.bigram (in Java). You are welcome to build on the BigramCount (Scala) implementation here for tokenization and "boilerplate" code like command-line argument parsing. To be consistent in tokenization, you should use (i.e., import) the Tokenizer trait here.

Put your code in the package ca.uwaterloo.cs451.a2. Since you'll be writing Scala code, your source files should go into src/main/scala/ca/uwaterloo/cs451/a2/. The repository is designed so that Scala/Spark code will also compile with the same Maven build command:

$ mvn clean package

Following the Java implementations, you will write both a "pairs" and a "stripes" implementation in Spark. Note that although Spark has a different API than MapReduce, the algorithmic concepts are still very much applicable. Your pairs and stripes implementation should follow the same logic as in the MapReduce implementations. In particular, your program should only take one pass through the input data.

Make sure your implementation runs in the Linux student CS environment on the Shakespeare collection and also on sample Wikipedia file /data/cs451/enwiki-20180901-sentences-0.1sample.txt on HDFS in the Datasci cluster. See the software page for more details on accessing the Datasci.

You can verify the correctness of your algorithm by comparing the output of the MapReduce implementation with your Spark implementation.

Clarification on terminology: informally, we often refer to "mappers" and "reducers" in the context of Spark. That's a shorthand way of saying map-like transformations (map, flatMap, filter, mapPartitions, etc.) and reduce-like transformations (e.g., reduceByKey, groupByKey, aggregateByKey, etc.). Hopefully it's clear from lecture that while Spark represents a generalization of MapReduce, the notions of per-record processing (i.e., map-like transformation) and grouping/shuffling (i.e., reduce-like transformations) are shared across both frameworks.

We are going to run your code in the Linux student CS environment as follows (we will make sure the collection is there):

$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyPairs \
   target/assignments-1.0.jar --input data/Shakespeare.txt \
   --output cs451-lintool-a2-shakespeare-bigrams-pairs --reducers 5

$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyStripes \
   target/assignments-1.0.jar --input data/Shakespeare.txt \
   --output cs451-lintool-a2-shakespeare-bigrams-stripes --reducers 5

We are going to run your code on the Datasci cluster as follows (note the addition of the --num-executors and --executor-cores options):

$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyPairs \
   --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \
   --input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \
   --output cs451-lintool-a2-wiki-bigram-pairs --reducers 8

$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyStripes \
   --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \
   --input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \
   --output cs451-lintool-a2-wiki-bigram-stripes --reducers 8

Sample output for pairs on datasci

((dream,*),13743.0)
((dream,a),0.013388635)
((dream,abby),7.276431E-5)
((dream,abdul),7.276431E-5)
((dream,abe),7.276431E-5)
((dream,about),0.007931311)
...

Sample output for stripes on datasci

(dream,Map(follow -> 7.276431E-5, intersperses -> 7.276431E-5, sister -> 7.276431E-5, former -> 7.276431E-5, ...

Important: Make sure that your code accepts the command-line parameters above!

When you run a Spark job (in distributed mode), you need to specify how much cluster resource to request. The option --num-executors specifies the number of executors, each with a certain number of cores specified by --executor-cores. So, in the above commands, we request a total of 8 workers (2 executors, 4 cores each).

The --reducers flag is the amount of parallelism that you set in your program in the reduce stage. If the total number of workers is larger than --reducers, some of the workers will be sitting idle, since you've allocated more workers for the job than the parallelism you've specified in your program. If --reducers is larger than the number of workers, on the other hand, then your reduce tasks will queue up at the workers, i.e., a worker will be assigned more than one reduce task. In the above example we set the two equal.

Note that the setting of these two parameters should not affect the correctness of your program. The setting above is a reasonable middle ground between having your jobs finish in a reasonable amount of time and not monopolizing cluster resources.

A related but still orthogonal concept is partitions. Partitions describes the physical division of records across workers during execution. When reading from HDFS, the number of HDFS blocks determines the number of partitions in your RDD. When you apply a reduce-like transformation, you can optionally specify the number of partitions (or Spark applies a default) — in this case, the number of partitions is equal to the number of reducers.

PMI

(Note: the caution about "one pass" was for the previous question, not this one. As with A1 you should be using two passes to compute PMI values)

Your starting points for PMI computations in Spark should be your solutions to assignment 1. Write two programs, PairsPMI and StripesPMI that go in package ca.uwaterloo.cs451.a2, in src/main/scala/ca/uwaterloo/cs451/a2/.

There are obviously going to be differences in the MapReduce and Spark implementations, but we want you to preserve the "spirit" of the "pairs" vs. "stripes" approach in your respective implementations. That is, the pairs implementation keeps track of each co-occurring counts independently, while the stripes implementation groups all co-occurring terms with respect to a term. If you have questions, please ask.

We are going to run your code in the Linux student CS environment as follows (we will make sure the collection is there):

$ spark-submit --class ca.uwaterloo.cs451.a2.PairsPMI \
   target/assignments-1.0.jar --input data/Shakespeare.txt \
   --output cs451-lintool-a2-shakespeare-pmi-pairs --reducers 5 --threshold 10

$ spark-submit --class ca.uwaterloo.cs451.a2.StripesPMI \
   target/assignments-1.0.jar --input data/Shakespeare.txt \
   --output cs451-lintool-a2-shakespeare-pmi-stripes --reducers 5 --threshold 10

We are going to run your code on the Datasci cluster as follows (we'll use the same simple Wikipedia collection at /data/cs451/simplewiki-20180901-sentences.txt from assignment 1):

$ spark-submit --class ca.uwaterloo.cs451.a2.PairsPMI \
   --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \
   --input /data/cs451/simplewiki-20180901-sentences.txt \
   --output cs451-lintool-a2-wiki-pmi-pairs --reducers 8 --threshold 10

$ spark-submit --class ca.uwaterloo.cs451.a2.StripesPMI \
   --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \
   --input /data/cs451/simplewiki-20180901-sentences.txt \
   --output cs451-lintool-a2-wiki-pmi-stripes --reducers 8 --threshold 10

Sample output for PMI pairs on datasci

((dream,a),(0.22365512762434872,519))
((dream,about),(0.22470161302138905,43))
((dream,adventure),(1.0951593166120552,10))
((dream,after),(0.12074540777419508,39))
((dream,again),(0.3853604924790266,11))
((dream,album),(0.720192667964914,56))
((dream,albums),(0.7919393661940846,15))
...

Sample output for PMI stripes on datasci

(dream,Map(requiem -> (2.0444317238839496,14), famous -> (0.45165986383394874,20), their -> (0.19268500888254783,46), ...

Hints:

Turning in the Assignment

Please follow these instructions carefully!

All implementations should be in package ca.uwaterloo.cs451.a2; your Scala code should be in src/main/scala/ca/uwaterloo/cs451/a2/.

When grading, we will pull your repo and build your code:

$ mvn clean package

And run using exactly the commands above. Make sure that your code runs in the Linux Student CS environment (even if you do development on your own machine), which is where we will be doing the grading. "But it runs on my laptop!" will not be accepted as an excuse if we can't get your code to run.

Specifically, we will clone your repo and use the below check scripts:

When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and going through the steps above.

That's it!

Grading

This assignment is worth a total of 40 points, broken down as follows:

There are no points explicitly for hidden test cases: the points are folded into the distribution above.

Reference Running Times

To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.

Class name Running time Linux Running time Datasci
ComputeBigramRelativeFrequencyPairs 30 seconds 6 minutes 30 seconds
ComputeBigramRelativeFrequencyStripes 25 seconds 2 minutes 30 seconds
PairsPMI 45 seconds 10 minutes
StripesPMI 25 seconds 3 minutes

Back to top