In this assignment you will do two things:
Your starting points
are ComputeBigramRelativeFrequencyPairs
and ComputeBigramRelativeFrequencyStripes
in
package io.bespin.java.mapreduce.bigram
(in Java).
You are welcome to build on the BigramCount
(Scala)
implementation here
for tokenization and "boilerplate" code like command-line argument
parsing. To be consistent in tokenization, you should use (i.e., import)
the Tokenizer
trait
here.
Put your code in the
package ca.uwaterloo.cs451.a2
. Since
you'll be writing Scala code, your source files should go
into src/main/scala/ca/uwaterloo/cs451/a2/
.
The repository is designed so that Scala/Spark code will also
compile with the same Maven build command:
$ mvn clean package
Following the Java implementations, you will write both a "pairs" and a "stripes" implementation in Spark. Note that although Spark has a different API than MapReduce, the algorithmic concepts are still very much applicable. Your pairs and stripes implementation should follow the same logic as in the MapReduce implementations. In particular, your program should only take one pass through the input data.
Make sure your implementation runs in the Linux student CS
environment on the Shakespeare collection and also on sample Wikipedia
file /data/cs451/enwiki-20180901-sentences-0.1sample.txt
on HDFS in the Datasci cluster. See
the software page for more details on
accessing the Datasci.
You can verify the correctness of your algorithm by comparing the output of the MapReduce implementation with your Spark implementation.
Clarification on terminology: informally, we often refer to
"mappers" and "reducers" in the context of Spark. That's a shorthand
way of saying map-like transformations
(map
, flatMap
, filter
, mapPartitions
,
etc.) and reduce-like transformations
(e.g., reduceByKey
, groupByKey
, aggregateByKey
,
etc.). Hopefully it's clear from lecture that while Spark represents a
generalization of MapReduce, the notions of per-record processing
(i.e., map-like transformation) and grouping/shuffling (i.e.,
reduce-like transformations) are shared across both frameworks.
We are going to run your code in the Linux student CS environment as follows (we will make sure the collection is there):
$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyPairs \ target/assignments-1.0.jar --input data/Shakespeare.txt \ --output cs451-lintool-a2-shakespeare-bigrams-pairs --reducers 5 $ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyStripes \ target/assignments-1.0.jar --input data/Shakespeare.txt \ --output cs451-lintool-a2-shakespeare-bigrams-stripes --reducers 5
We are going to run your code on the Datasci cluster as follows
(note the addition of the --num-executors
and --executor-cores
options):
$ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyPairs \ --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \ --input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \ --output cs451-lintool-a2-wiki-bigram-pairs --reducers 8 $ spark-submit --class ca.uwaterloo.cs451.a2.ComputeBigramRelativeFrequencyStripes \ --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \ --input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \ --output cs451-lintool-a2-wiki-bigram-stripes --reducers 8
Sample output for pairs on datasci
((dream,*),13743.0) ((dream,a),0.013388635) ((dream,abby),7.276431E-5) ((dream,abdul),7.276431E-5) ((dream,abe),7.276431E-5) ((dream,about),0.007931311) ...
Sample output for stripes on datasci
(dream,Map(follow -> 7.276431E-5, intersperses -> 7.276431E-5, sister -> 7.276431E-5, former -> 7.276431E-5, ...
Important: Make sure that your code accepts the command-line parameters above!
When you run a Spark job (in distributed mode), you need to specify how much cluster
resource to request. The option --num-executors
specifies
the number of executors, each with a certain number of cores specified
by --executor-cores
. So, in the above commands, we
request a total of 8 workers (2 executors, 4 cores each).
The --reducers
flag is the amount of parallelism that
you set in your program in the reduce stage. If the total number of
workers is larger than --reducers
, some of the workers
will be sitting idle, since you've allocated more workers for the job
than the parallelism you've specified in your
program. If --reducers
is larger than the number of
workers, on the other hand, then your reduce tasks will queue up at
the workers, i.e., a worker will be assigned more than one reduce
task. In the above example we set the two equal.
Note that the setting of these two parameters should not affect the correctness of your program. The setting above is a reasonable middle ground between having your jobs finish in a reasonable amount of time and not monopolizing cluster resources.
A related but still orthogonal concept is partitions. Partitions describes the physical division of records across workers during execution. When reading from HDFS, the number of HDFS blocks determines the number of partitions in your RDD. When you apply a reduce-like transformation, you can optionally specify the number of partitions (or Spark applies a default) — in this case, the number of partitions is equal to the number of reducers.
(Note: the caution about "one pass" was for the previous question, not this one. As with A1 you should be using two passes to compute PMI values)
Your starting points for PMI computations in Spark should be your
solutions to assignment 1. Write two programs, PairsPMI
and StripesPMI
that go in
package ca.uwaterloo.cs451.a2
,
in src/main/scala/ca/uwaterloo/cs451/a2/
.
There are obviously going to be differences in the MapReduce and Spark implementations, but we want you to preserve the "spirit" of the "pairs" vs. "stripes" approach in your respective implementations. That is, the pairs implementation keeps track of each co-occurring counts independently, while the stripes implementation groups all co-occurring terms with respect to a term. If you have questions, please ask.
We are going to run your code in the Linux student CS environment as follows (we will make sure the collection is there):
$ spark-submit --class ca.uwaterloo.cs451.a2.PairsPMI \ target/assignments-1.0.jar --input data/Shakespeare.txt \ --output cs451-lintool-a2-shakespeare-pmi-pairs --reducers 5 --threshold 10 $ spark-submit --class ca.uwaterloo.cs451.a2.StripesPMI \ target/assignments-1.0.jar --input data/Shakespeare.txt \ --output cs451-lintool-a2-shakespeare-pmi-stripes --reducers 5 --threshold 10
We are going to run your code on the Datasci cluster as follows
(we'll use the same simple Wikipedia collection
at /data/cs451/simplewiki-20180901-sentences.txt
from assignment 1):
$ spark-submit --class ca.uwaterloo.cs451.a2.PairsPMI \ --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \ --input /data/cs451/simplewiki-20180901-sentences.txt \ --output cs451-lintool-a2-wiki-pmi-pairs --reducers 8 --threshold 10 $ spark-submit --class ca.uwaterloo.cs451.a2.StripesPMI \ --num-executors 2 --executor-cores 4 --executor-memory 24G target/assignments-1.0.jar \ --input /data/cs451/simplewiki-20180901-sentences.txt \ --output cs451-lintool-a2-wiki-pmi-stripes --reducers 8 --threshold 10
Sample output for PMI pairs on datasci
((dream,a),(0.22365512762434872,519)) ((dream,about),(0.22470161302138905,43)) ((dream,adventure),(1.0951593166120552,10)) ((dream,after),(0.12074540777419508,39)) ((dream,again),(0.3853604924790266,11)) ((dream,album),(0.720192667964914,56)) ((dream,albums),(0.7919393661940846,15)) ...
Sample output for PMI stripes on datasci
(dream,Map(requiem -> (2.0444317238839496,14), famous -> (0.45165986383394874,20), their -> (0.19268500888254783,46), ...
Hints:
Please follow these instructions carefully!
All implementations should be in
package ca.uwaterloo.cs451.a2
; your
Scala code should be
in src/main/scala/ca/uwaterloo/cs451/a2/
.
When grading, we will pull your repo and build your code:
$ mvn clean package
And run using exactly the commands above. Make sure that your code runs in the Linux Student CS environment (even if you do development on your own machine), which is where we will be doing the grading. "But it runs on my laptop!" will not be accepted as an excuse if we can't get your code to run.
Specifically, we will clone your repo and use the below check scripts:
check_assignment2_public_linux.py
in the Linux Student CS environment.check_assignment2_public_datasci.py
on the Datasci cluster.When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and going through the steps above.
That's it!
This assignment is worth a total of 40 points, broken down as follows:
There are no points explicitly for hidden test cases: the points are folded into the distribution above.
To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.
Class name | Running time Linux | Running time Datasci |
---|---|---|
ComputeBigramRelativeFrequencyPairs | 30 seconds | 6 minutes 30 seconds |
ComputeBigramRelativeFrequencyStripes | 25 seconds | 2 minutes 30 seconds |
PairsPMI | 45 seconds | 10 minutes |
StripesPMI | 25 seconds | 3 minutes |