## CS431/631 Data Intensive Distributed Computing
### Winter 2025 - Assignment 4
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** _replace this with your name_
* **ID:** _replace this with your UW student ID number_

Spark is not installed in Colab so we have to install it ourself. This will take a minute to finish. If you're using this on your own machine the following might not work, and you will have to install Spark yourself. 

In [None]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!curl -Os https://student.cs.uwaterloo.ca/~cs451/spark/spark-3.4.3-bin-hadoop3.tgz
!tar xzf spark-3.4.3-bin-hadoop3.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. On your own system you won't need to run the next box, but on Colab you must.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.3-bin-hadoop3"

Now you will be able to create the SparkContext object needed to run Spark code:

In [None]:

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext(appName="YourTest", master="local[*]")

If you are running on colab, you can run the next code block to create a clickable link to open the SparkUI. (If running on your own machine, then the previous cell should have given you a link that will work on your local machine, but if not, try localhost:4040)

In [None]:
from google.colab import output
output.serve_kernel_port_as_window(4040, path='/jobs/index.html')
# This will create a link below. You must click the link, do not copy & paste the URL as that's the "local" URL and won't work on your machine

---
#### Overview
For this assignment, you will be using Python and Spark to perform spam detection.   You will need to perform two tasks.   The first is to build spam prediction models, using training data sets and stochastic gradient descent (SGD).   The second is to use these models to predict whether the documents in a test data set are spam.
The stochastic gradient descent technique that you will be using is based on [a paper](http://arxiv.org/abs/1004.5168) by Cormack, Smucker and Clarke.

#### Training a Spam Classification Models
To build a spam classification model, you will start with a training data set.   Each instance in the training set represents a single document, and is labeled to indicate whether that document should be considered to be spam or ham.
An instance looks like this:
```
clueweb09-en0094-20-13546 spam 387908 697162 426572 161118 688171 ...
```
The first field, `clueweb09-en0094-20-13546`, is the (unique) document name.   The second field is the label, indicating whether the document should be considered spam (as in this example) or ham.   The remaining fields are integers representing *features* present in the document.   In this case, the features are hashed byte 4-grams, represented as integers.   Each training data set is stored as a text file, with one instance per line.   The training files  are:
* `spam.train.group_x.txt`   (25 MB)
* `spam.train.group_y.txt`   (20 MB)
* `spam.train.britney.txt`   (766 MB)

Now let's download the spamminess module and the training traces we will use in this assignment. This will take a few minutes. The ls command at the end shows the files we have in this directory. Make sure all files are here now.

In [None]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/W20/content/cs431/spamminess.py
!wget -q https://www.student.cs.uwaterloo.ca/~cs451/spam/spam.train.group_x.txt.bz2
!wget -q https://www.student.cs.uwaterloo.ca/~cs451/spam/spam.train.group_y.txt.bz2
!wget -q https://www.student.cs.uwaterloo.ca/~cs451/spam/spam.train.britney.txt.bz2

!bunzip2 spam.train.group_x.txt.bz2
!bunzip2 spam.train.group_y.txt.bz2
!bunzip2 spam.train.britney.txt.bz2
!ls

sample_data		spam.train.group_y.txt
spamminess.py		spark-2.4.8-bin-hadoop2.7
spam.train.britney.txt	spark-2.4.8-bin-hadoop2.7.tgz
spam.train.group_x.txt



---
### Important

The questions that follow ask you to implement functions whose prototypes are given to you. Do _**not**_ change the prototypes of the functions. Do _**not**_ write code outside of the functions.

You may use specific cells, identified by `# Your tests here`, for test purposes. Code in these cells will *not* be executed when marking your assignment.

---

#### Question 1 ( 5/20 marks)

Your first task is to write a sequential SGD model trainer in Python (no Spark).   For our purposes, a model associates a *weight* with each feature.   The model trainer decides what these weights should be, based on the training instances.  Since you are going to be writing a model trainer based on SGD, the trainer should behave like this:
```
for each training instance T
   predict whether T is spam or ham using the weights of the current model
   update the model weights by comparing T's predicted label with its actual label
```
Of course, the important part is how to update the model.

In [the paper](http://arxiv.org/abs/1004.5168), the model is used to assign a "spamminess" score to a document.   Documents with positive spamminess are predicted to be spam.   Those with negative spamminess are predicted to be ham.  The spamminess of a document $D$ is simply the sum of the weights (from the model) of each of the document's features:
\begin{equation}
spamminess(D) = \sum_{f \in D}{w(f)}
\end{equation}
where $w(f)$ is the weight assocated with feature $f$.

The Python module `spamminess.py` defines a function `spamminess(F,W)` which computes this quantity.   This function takes two arguments, `F` and `W`.  `F` is a list of features (integers) associated to the document whose spamminess you want to compute, and `W` is a dictionary representing the current model.  `W` maps features ($f$) to their weights ($w(f)$) under the model.

In the cell below, you will find partial pseudo-code that shows how to implement the SGD model trainer defined by Cormack, Smucker, and Clarke.   It reads the training instances one at a time from one of the training files, and uses them to adjust the model weights.   Your job is to turn this pseudo-code into actual runnable Python code that can
be used to learn a model from any one of the training files. Implement the function `sequential_SGD()` that takes as input a model (`w`), the training dataset and a value for the update parameter `delta`, and returns the trained model.

In [None]:
# A4Q1 

from spamminess import spamminess
from math import exp

def sequential_SGD(model, training_dataset='spam.train.group_x.txt', delta = 0.002):
    #### Your solution to Question 1 here
    # open one of the training files - defaults to group_x
    with open(training_dataset) as f:
        pass
    #   for line in f:
    #      each line represents a document
    #      read and parse the line
    #      Let:
    #        t represent the spam/ham tag for this document
    #        F represent the list of features for this document

    #      find the spamminess of the current document using the current model:
    #      score = spamminess(F,w)

    #      then, update the model:
    #      prob = 1.0/(1+exp(-score))
    #      for each feature f in F:
    #          if t == 'spam':
    #              increase model(f) by (1.0-prob)*delta (or set model(f) to (1.0-prob)*delta if f is not in the dict yet)
    #          elif t == 'ham':
    #              decrease model(f) by prob*delta (or set model(f) to -prob*delta if f is not in the dict yet)
    return model


In [None]:
# Your tests here
w = sequential_SGD({}) # Providing an empty model


#### Question 2 (5/20 marks)

Next, you should try implementing a Spark version of the SGD model trainer.   Your Spark implementation should read a training file, train the model, and then output the model to the `models` folder.  The model output file that you generate should list the weight associated with each feature, with one feature per line, like this:
```
(802123, 0.0009858585991850937)
(438450, 4.267897922108138e-05)
(271525, 0.0013133437007968654)
(92853, 0.0004300009932503611)
```

Use Spark's `saveAsTextFile` action to output your model.   For example, if you are training a model for the group_x training set, use `saveAsTextFile("models/group_x_model")`.   This will actually cause Spark to create a folder called `group_x_model`.   In the folder, there will be files with names like `part-00000` that contain the actual output data.  When you use `saveAsTextFile`, Spark will generate one `part-xxxxx` file for each partition of the RDD that you are writing out.   In this case, you should have only a single partition (for the reason described below), so there should be only one `part-xxxxx` file.

Training the SGD model is an inherently sequential task, since the training instances update the model one at a time, and each instance's spamminess is computed using the model produced by that instance's predecessors.   This means that the only part of the training that you can parallelize using Spark is the parsing of the input file.   Once the input is parsed, your Spark implementation will have to force all of the instances into a single partition, and then apply the training function to the entire partition.   To see whether you are getting sensible results, you can compare the model you learn with Spark to the one that you learned with your sequential Python program from Question 1.

Remember that training should occur entirely in Spark.  The training instances should never come into your driver program.

Implement the function `spark_SGD()` below that takes as input the path to the training dataset, an output path `output_model` and a value for the update parameter `delta`, and writes the trained model to `output_model` using Spark's `saveAsTextFile`. You can use it to generate models from all three of the training files, leaving the results in your models folder. For this assignment, you will be using Spark's original RDD interface, rather than the DataFrame interface.

Hint: You need to move all of the data into one partition and then use mapPartition to train the model.


In [None]:
# A4Q2 

from spamminess import spamminess
from math import exp
import shutil, os

def spark_SGD(training_dataset='spam.train.group_x.txt', output_model='models/group_x_model', delta = 0.002):
    if os.path.isdir(output_model):
        shutil.rmtree(output_model) # Remove the previous model to create a new one
    training_data = sc.textFile(training_dataset)
    #### Your Solution to Question 2 here
    pass



In [None]:
# Your tests here
spark_SGD()
spark_SGD(training_dataset='spam.train.group_y.txt', output_model='models/group_y_model')
spark_SGD(training_dataset='spam.train.britney.txt', output_model='models/britney_model')

#### Question 3 (5/20 marks)

When you train a model using SGD, the model you get depends on the order in which you handle the training instances.  To see this in action, try using the Spark SGD trainer you implemented for Question 2 to train a model from the group_x training set, but with the instances processed in a different order.  

To do this, re-implement your trainer from Question 2 so that it will randomly reorder the training instances before using them to update the model. One way to shuffle the training instances is to assign a random sort key to each training instance as you read it from the input file, and then sort the instances using the random sort key.

Be sure that Spark is doing the work of shuffling the training instances.   Do not load the training instances into your driver program and sort them there.

Implement the function `spark_shuffled_SGD` below that takes as input the path to the training dataset, an output path `output_model` and a value for the update parameter `delta`, shuffles the training instances using the method described above and writes the trained model to `output_model` using Spark's `saveAsTextFile`.

Once you have implemented the shuffled trainer, train a model using shuffled group_x training instances, and compare the resulting model with group_x model you learned without shuffling.  It is up to you how to do this comparision.  At a minimum, compare features with the highest weights in each model to see if they are similar. You can also use the classifier in next question to classify documents using the two models, and compare results.


In [None]:
# A4Q3

from spamminess import spamminess
from math import exp
import shutil, os, random

def spark_shuffled_SGD(training_dataset='spam.train.group_x.txt', output_model='models/group_x_model', delta = 0.002):
    if os.path.isdir(output_model):
        shutil.rmtree(output_model) # Remove the previous model to create a new one
    training_data = sc.textFile(training_dataset)
    #### Your Solution to Question 3 here
    pass




In [None]:
# Your tests here
spark_shuffled_SGD(output_model='models/group_x_model_shuffled')


#### Question 4 (5/20  marks)

Last but not least, you should write a Spark program that can be used to classify documents as spam or ham, using the classification models you produced.

The test data, i.e., the document instances that you should classifiy, are located in `spam.test.qrels.txt`. Run the following block to download this trace. This will take a few minutes.

In [None]:
!wget -q https://www.student.cs.uwaterloo.ca/~cs451/spam/spam.test.qrels.txt.bz2
!bunzip2 spam.test.qrels.txt.bz2
!ls


Each line in this file represents a document that needs to be classified as spam or ham.  The format of this file is identical to the format of the files that hold the training instances.

Implement the function `spark_classify` below that will load a model (from a specified folder under `models`), classify all of the instances in a given test data file (`spam.test.qrels.txt` by default) using that model, and then output the results in the folder `results_path` using Spark's `saveAsTextFile`.   The contents of the output file should look like this:
```
(clueweb09-en0000-00-00142,spam,2.601624279252943,spam)
(clueweb09-en0000-00-01005,ham,2.5654162439491004,spam)
(clueweb09-en0000-00-01382,ham,2.5893946346394188,spam)
```
Each line of the output represents one test instance.   The first two fields are the document ID and the test label.  These are just copied from the test data.   The third field is the spamminess score of the document, produced by the spamminess function using the model you are classifying with.   The fourth field is the spam/ham prediction made by the model.

Of course, your spam/ham classifier must **not** use the test label from the input when making its prediction.  The test labels are the "ground truth" against which your predictions are being compared.   Using them to make predictions would defeat the whole purpose of model-based classification.

Make sure that classification of the test instances is done by Spark, not by your driver program.  Do ***not*** load the test instances or classification results into your driver program. You are however allowed to load the model weights into your driver program to distribute them as side data. 
Unlike model training, classification is easily parallelizable, since each document is classified independently. 

In [None]:
# A4Q4
from spamminess import spamminess
import shutil, os

def spark_classify(input_model='models/group_x_model', test_dataset='spam.test.qrels.txt', results_path='results/test_qrels'):
    if os.path.isdir(results_path):
        shutil.rmtree(results_path) # Remove the previous results
    test_data = sc.textFile(test_dataset)
    #### Your Solution to Question 4 here
    pass

We have developed a program that can be used to evaluate your classification results.  Run the next block to download this program.

In [None]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/compute_spam_metrics.c
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/spam_eval.sh

Now compile this program.

In [None]:
!gcc -w -O2 -o compute_spam_metrics compute_spam_metrics.c -lm

 Given your ouput file, in the proper format, it will compute the area under the receiver operating curve (ROC).   This is a common way to characterize classifier error.    The lower this score, the better.   The evaluation program should produce one line of output, like this
```
1-ROCA%: 17.25
```

Use your classifier to classify the test instances using each of the three classification models that you produced, which should result in three different output files.   Then, in the cell below,
use the evaluation program to evaluate your results.


In [None]:
# Your tests here
#  Run the evaluation program like this, after first replacing "output-file"
#  with the name of the folder that holds your classifier's output
!bash spam_eval.sh output-file

---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.