Assignment 3: Inverted Indexing due 4:00 pm Feb. 25

This assignment is to be completed in Spark using Scala. You will be working in the same repo as before, except that everything should go into the package namespace ca.uwaterloo.cs451.a3.

You've been provided some inverted indexing and boolean retrieval code

Make sure you understand the code. Starting from the inverted indexing baseline BuildInvertedIndexBasic, modify the indexer code in the following ways:

1. Index Compression. The index should be compressed using VInts / VLongs: see org.apache.hadoop.io.WritableUtils. You should also use gap-compression (i.e., delta-compression) techniques as appropriate.

. Note: The value type must be BytesWritable and for each hit, you must store the gap first, followed by the term frequency.

2. Document partitioning. The baseline indexer shuffles into a single partition. It would be trivial to change the number of partitions and have an index partitioned by term. However, instead we will be partitioning the document by the document ID. We will accomplish this by adding a partitions command line argument, and loading the collection into this many partitions. If we load the collection text file and continue to use zipWithIndex it will be ideal - each partition will be sorted, and there will be no gaps. However, we wouldn't have control of the number of partitions. If we used coalesce, this would break the ordering. So, instead, we will use repartitionAndSortWithinPartitions. It's important that this is done after zipWithIndex so that we get the correct index for each line of text. It's also important that we use a RangePartitioner to ensure that each partition will contain a range of document IDs rather than an arbitrary sampling (This will reduce the mean gap size and make gap encoding more effective). The addition of this shuffle doesn't substantially alter the runtime. In other words you must transform the textFile RDD in the following order:

  1. Add document IDs using zipWithIndex
  2. repartitionAndSortWithinPartitions using a RangePartitioner - use the partitions command-line argument for the number of partitions. (Don't forget that zipWithIndex puts the docID as the value so you'll need a map to flip keys and values around, otherwise it will be sorting and partitioning according to the text, and we want to do this to the document IDs).
You'll need to rewrite the inverted index generation using mapPartitions

3. Perform term-at-a-time on the workers, not on the driver. The baseline indexer implementation currently collects the postings to the driver to perform term-at-a-time matching. This is not scalable as some terms may involve a very large posting. In addition to modifying the Boolean Retrieval program to read compressed postings, you must also modify it so that matches happen in parallel on the workers. You should then get each partition's hits to the driver for display to the user. Ensure that the reference behaviour is unaltered: For each query it should print how many hits there were in total, and then print up to ten of them.

Note: Do not unnecessarily change code not directly related to points #1-#3 above. In particular, do not change how the documents are tokenized and do not change how the document IDs are generated (see the note above that you MUST generate them before attempting to repartition).

To go into a bit more detail about the new format for postings: in the reference implementation, the final key type is PairOfWritables<IntWritable, ArrayListWritable<PairOfInts>>. The most obvious idea is to change that into something like PairOfWritables<VIntWritable, ArrayListWritable<PairOfVInts>>. This does not work! The reason is that you will still be materializing each posting, i.e., all PairOfVInts objects in memory. This translates into a Java object for every posting, which is wasteful in terms of memory usage and will exhaust memory pretty quickly as you scale. In other words, you're still buffering objects—just inside the ArrayListWritable. Instead, you should be creating a org.apache.hadoop.io.DataOutputBuffer and using WritableUtils to write into it. The final step should be to create a new BytesWritable using the data from your DataOutputBuffer (Note that the backing array may be oversized, so use the getLength() method to grab only the valid values from the buffer).

This new indexer should be named BuildInvertedIndexCompressed. This new class should be in the package ca.uwaterloo.cs451.a3. Make sure it works on the Shakespeare collection.

Modify BooleanRetrieval so that it works with the new compressed indexes. Name this new class BooleanRetrievalCompressed. This new class should be in the same package as above and give the same output as the old version.

NOTE: On datasci.cs you will have eight Sequence files, However, as they are more than 128MB HDFS will have split them into multiple blocks. By default Spark will load a sequence file into one partition per HDFS block. We do not want this we have done segmentating by document ID! It is vital that each sequence file occupy only one partition when we load the inverted index! To do this, we'll need a custom class.

        class NonSplittableSequenceFileInputFormat[K, V]  extends SequenceFileInputFormat[K, V] {
            override def isSplitable(fs: org.apache.hadoop.fs.FileSystem, filename: Path): Boolean = false
         }   
    
Instead of using sc.sequenceFile you'll need to use sc.newAPIHadoopFile and use the above class as one of the type tags (i.e. template parameters).

In more detail, make sure that you can build the inverted index with the following command (make sure your implementation runs in the Linux student CS environment, as that is where we will be doing the marking):

$ spark-submit --class ca.uwaterloo.cs451.a3.BuildInvertedIndexCompressed target/assignments-1.0.jar \
   --input data/Shakespeare.txt --output cs451-a3-index-shakespeare --partitions 4

We should be able to control the number of partitions with the --partitions option. That is, the code should give the correct results no matter what we set the value to.

Once we build the index, we should then be able to run a boolean query as follows (in exactly the same manner as BooleanRetrieval in Bespin):

$ spark-submit --class ca.uwaterloo.cs451.a3.BooleanRetrievalCompressed target/assignments-1.0.jar  \
   --index cs451-a3-index-shakespeare --collection data/Shakespeare.txt 
This will prompt you for queries. Try the following queries:
  outrageous fortune AND
  white red OR rose AND pluck AND
Hint: The first query should only match one line - "25335 The slings and arrows of outrageous fortune". The second should match five lines.

Of course, we will try your program with additional queries to verify its correctness.

HINT: The size of the compressed index for Shakespeare collection is 3.3 MB using our sample solution. You can use du -h to read the size of the index file. If the size of your index file is different, your code probably has a bug and you will lose the correctness mark. NOTE: If you use hadoop fs -du -h on student.cs you will get four files from 700-900 kb in size! The HDFS du command works slightly differently.

Running on the Datasci cluster

Now let's try running your implementation on the Datasci cluster, on the sample Wikipedia file /data/cs451/enwiki-20180901-sentences-0.1sample.txt on HDFS:

  $ spark-submit --class  ca.uwaterloo.cs451.a3.BuildInvertedIndexCompressed \
    --num-executors 2 --executor-cores 4 --executor-memory 24G \
    target/assignments-1.0.jar \
    --input /data/cs451/enwiki-20180901-sentences-0.1sample.txt \
    --output cs451-a3-index-wiki --partitions 8

The Wikipedia sample contains a sentence on each line, so each "document" is actually a sentence. Each sentence begins with the article title and the sentence id, e.g., "Anarchism.0004" is sentence 4 from the article "Anarchism".

And let's try running a query:

  $ spark-submit --class ca.uwaterloo.cs451.a3.BooleanRetrievalCompressed \
    --num-executors 2 --executor-cores 4 --executor-memory 24G \
    target/assignments-1.0.jar \
    --index cs451-a3-index-wiki \
    --collection /data/cs451/enwiki-20180901-sentences-0.1sample.txt 
Try the following two queries:
  waterloo stanford OR cheriton AND
  big data AND hadoop spark OR AND
You will get 3 and 9 hits respectively. Be sure to visually inspect to make sure that the hits actually match the query. If not you either decoded the document IDs incorrectly, or you did not generate the document IDs consistently. HINT: The size of the compressed index for the sample Wikipedia collection is
$ hdfs dfs -du -h cs451-a3-index-wiki/part*
86.8 M  260.3 M  cs451-a3-index-wiki/part-00000
87.6 M  262.7 M  cs451-a3-index-wiki/part-00001
94.6 M  283.7 M  cs451-a3-index-wiki/part-00002
91.1 M  273.2 M  cs451-a3-index-wiki/part-00003
92.4 M  277.1 M  cs451-a3-index-wiki/part-00004
95.3 M  285.9 M  cs451-a3-index-wiki/part-00005
87.1 M  261.4 M  cs451-a3-index-wiki/part-00006
91.8 M  275.5 M  cs451-a3-index-wiki/part-00007
If the size of your index file is different, your code probably has a bug and you will (probably) lose the correctness mark.

TODO: Provide a quick check program that can confirm posting format.

Turning in the Assignment

Please follow these instructions carefully!

Make sure your repo has the following items:

Make sure your implementation runs in the Linux student CS environment on the Shakespeare collection and also on sample Wikipedia file /data/cs451/enwiki-20180901-sentences-0.1sample.txt on HDFS in the Datasci cluster, per above.

Specifically, we will clone your repo and use the below check scripts (plus additional scripts to check the results):

UNDER CONSTRUCTION - THESE SCRIPTS ARE NOT WRITTEN YET

When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.

That's it!

Grading

This assignment is worth a total of 50 points, broken down as follows:

Reference Running Times

To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.

Class name Running time Linux Running time Datasci
BuildInvertedIndexCompressed 12 seconds 2 minutes
BooleanRetrievalCompressed (first query) < 1 second < 30 seconds
BooleanRetrievalCompressed (second+ query) < 200 ms / query < 500 ms / query

Back to top