
This assignment is to be completed in Spark using Scala. You will
be working in the same repo as before, except that everything should
go into the package namespace
ca.uwaterloo.cs451.a3.
You've been provided some inverted indexing and boolean retrieval code
Text keys and ArrayListWritable values.
BuildInvertedIndexBasic, modify the indexer code in
the following ways:
1. Index Compression. The index should be compressed using
VInts / VLongs:
see org.apache.hadoop.io.WritableUtils. You should also
use gap-compression (i.e., delta-compression) techniques as appropriate.
BytesWritable and for each hit, you must store the gap first, followed by the term frequency.
2. Document partitioning. The baseline indexer shuffles into a single partition. It would be trivial to change the number of partitions and have an index partitioned by term. However, instead we will be partitioning the document by the document ID. We will accomplish this by adding a partitions command line argument, and loading the collection into this many partitions. If we load the collection text file and continue to use zipWithIndex it will be ideal - each partition will be sorted, and there will be no gaps. However, we wouldn't have control of the number of partitions. If we used coalesce, this would break the ordering. So, instead, we will use repartitionAndSortWithinPartitions. It's important that this is done after zipWithIndex so that we get the correct index for each line of text. It's also important that we use a RangePartitioner to ensure that each partition will contain a range of document IDs rather than an arbitrary sampling (This will reduce the mean gap size and make gap encoding more effective). The addition of this shuffle doesn't substantially alter the runtime. In other words you must transform the textFile RDD in the following order:
zipWithIndexrepartitionAndSortWithinPartitions using a RangePartitioner - use the partitions command-line argument for the number of partitions. (Don't forget that zipWithIndex puts the docID as the value so you'll need a map to flip keys and values around, otherwise it will be sorting and partitioning according to the text, and we want to do this to the document IDs).
mapPartitions
3. Perform term-at-a-time on the workers, not on the driver. The baseline indexer implementation currently collects the postings to the driver to perform term-at-a-time matching. This is not scalable as some terms may involve a very large posting. In addition to modifying the Boolean Retrieval program to read compressed postings, you must also modify it so that matches happen in parallel on the workers. You should then get each partition's hits to the driver for display to the user. Ensure that the reference behaviour is unaltered: For each query it should print how many hits there were in total, and then print up to ten of them.
Note: Do not unnecessarily change code not directly related to points #1-#3 above. In particular, do not change how the documents are tokenized and do not change how the document IDs are generated (see the note above that you MUST generate them before attempting to repartition).
To go into a bit more detail about the new format for postings: in the reference implementation, the
final key type is PairOfWritables<IntWritable,
ArrayListWritable<PairOfInts>>. The most obvious idea
is to change that into something
like PairOfWritables<VIntWritable,
ArrayListWritable<PairOfVInts>>. This does not work!
The reason is that you will still be materializing each posting, i.e.,
all PairOfVInts objects in memory. This translates into a
Java object for every posting, which is wasteful in terms of memory
usage and will exhaust memory pretty quickly as you scale. In other
words, you're still buffering objects—just inside
the ArrayListWritable. Instead, you should be creating a org.apache.hadoop.io.DataOutputBuffer and using WritableUtils to write into it.
The final step should be to create a new BytesWritable using the data from your DataOutputBuffer (Note that the backing array may be oversized, so use the getLength() method to grab only the valid values from the buffer).
This new indexer should be
named BuildInvertedIndexCompressed. This new class should
be in the package ca.uwaterloo.cs451.a3. Make
sure it works on the Shakespeare collection.
Modify BooleanRetrieval so that it works with the new
compressed indexes. Name this new
class BooleanRetrievalCompressed. This new class should
be in the same package as above and give the same
output as the old version.
NOTE: On datasci.cs you will have eight Sequence files, However, as they are more than 128MB HDFS will have split them into multiple blocks. By default Spark will load a sequence file into one partition per HDFS block. We do not want this we have done segmentating by document ID! It is vital that each sequence file occupy only one partition when we load the inverted index! To do this, we'll need a custom class.
class NonSplittableSequenceFileInputFormat[K, V] extends SequenceFileInputFormat[K, V] {
override def isSplitable(fs: org.apache.hadoop.fs.FileSystem, filename: Path): Boolean = false
}
Instead of using sc.sequenceFile you'll need to use sc.newAPIHadoopFile and use the above class as one of the type tags (i.e. template parameters).
In more detail, make sure that you can build the inverted index with the following command (make sure your implementation runs in the Linux student CS environment, as that is where we will be doing the marking):
$ spark-submit --class ca.uwaterloo.cs451.a3.BuildInvertedIndexCompressed target/assignments-1.0.jar \ --input data/Shakespeare.txt --output cs451-a3-index-shakespeare --partitions 4
We should be able to control the number of partitions
with the --partitions option. That is, the code should give
the correct results no matter what we set the value to.
Once we build the index, we should then be able to run a boolean
query as follows (in exactly the same manner
as BooleanRetrieval in Bespin):
$ spark-submit --class ca.uwaterloo.cs451.a3.BooleanRetrievalCompressed target/assignments-1.0.jar \ --index cs451-a3-index-shakespeare --collection data/Shakespeare.txtThis will prompt you for queries. Try the following queries:
outrageous fortune AND white red OR rose AND pluck ANDHint: The first query should only match one line - "25335 The slings and arrows of outrageous fortune". The second should match five lines.
Of course, we will try your program with additional queries to verify its correctness.
HINT: The size of the compressed index for Shakespeare collection is 3.3 MB using our sample solution. You can usedu -h to
read the size of the index file. If the size of your index file is different,
your code probably has a bug and you will lose the correctness mark.
NOTE: If you use hadoop fs -du -h on student.cs you will get four files from 700-900 kb in size! The HDFS du command works slightly differently.
Now let's try running your implementation on the Datasci cluster,
on the sample Wikipedia
file /data/cs451/enwiki-20180901-sentences-0.1sample.txt
on HDFS:
$ spark-submit --class ca.uwaterloo.cs451.a3.BuildInvertedIndexCompressed \
--num-executors 2 --executor-cores 4 --executor-memory 24G \
target/assignments-1.0.jar \
--input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \
--output cs451-a3-index-wiki --partitions 8
The Wikipedia sample contains a sentence on each line, so each "document" is actually a sentence. Each sentence begins with the article title and the sentence id, e.g., "Anarchism.0004" is sentence 4 from the article "Anarchism".
And let's try running a query:
$ spark-submit --class ca.uwaterloo.cs451.a3.BooleanRetrievalCompressed \
--num-executors 2 --executor-cores 4 --executor-memory 24G \
target/assignments-1.0.jar \
--index cs451-a3-index-wiki \
--collection /data/cs451/enwiki-20180901-sentences-0.1sample.txt
Try the following two queries:
waterloo stanford OR cheriton AND big data AND hadoop spark OR ANDYou will get 3 and 9 hits respectively. Be sure to visually inspect to make sure that the hits actually match the query. If not you either decoded the document IDs incorrectly, or you did not generate the document IDs consistently. HINT: The size of the compressed index for the sample Wikipedia collection is
$ hdfs dfs -du -h cs451-a3-index-wiki/part* 86.8 M 260.3 M cs451-a3-index-wiki/part-00000 87.6 M 262.7 M cs451-a3-index-wiki/part-00001 94.6 M 283.7 M cs451-a3-index-wiki/part-00002 91.1 M 273.2 M cs451-a3-index-wiki/part-00003 92.4 M 277.1 M cs451-a3-index-wiki/part-00004 95.3 M 285.9 M cs451-a3-index-wiki/part-00005 87.1 M 261.4 M cs451-a3-index-wiki/part-00006 91.8 M 275.5 M cs451-a3-index-wiki/part-00007If the size of your index file is different, your code probably has a bug and you will (probably) lose the correctness mark.
Please follow these instructions carefully!
Make sure your repo has the following items:
ca.uwaterloo.cs451.a3.Make sure your implementation runs in the Linux student CS
environment on the Shakespeare collection and also on sample Wikipedia
file /data/cs451/enwiki-20180901-sentences-0.1sample.txt
on HDFS in the Datasci cluster, per above.
Specifically, we will clone your repo and use the below check scripts (plus additional scripts to check the results):
When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.
That's it!
This assignment is worth a total of 50 points, broken down as follows:
partitions sequence files, and we will verify the exact byte content of selected postings.To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.
| Class name | Running time Linux | Running time Datasci |
|---|---|---|
| BuildInvertedIndexCompressed | 12 seconds | 2 minutes |
| BooleanRetrievalCompressed (first query) | < 1 second | < 30 seconds |
| BooleanRetrievalCompressed (second+ query) | < 200 ms / query | < 500 ms / query |