## CS431/631 Data Intensive Distributed Computing
### Fall 2021 - Assignment 6
---

**Please edit this (text) cell to provide your name and UW student ID number!**
* **Name:** _replace this with your name_
* **ID:** _replace this with your UW student ID number_

Let's first install Spark. This will take a minute to finish.

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
!tar xf spark-2.4.8-bin-hadoop2.7.tgz
!pip install -q findspark

Now that you installed Spark and Java in Colab, it is time to set the environment path which enables you to run Pyspark in your Colab environment. Next we define a function that helps us create SparkContext and StreamingContext. 

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.8-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import time

# This function creates SparkContext and StreamingContext
# Do not change this function
def initStreamingContext():
    try:
      ssc.end()
    except:
      pass
    finally:
      spark_conf = SparkConf()\
            .setAppName("YourTest")\
            .setMaster("local[*]")
      sc = SparkContext.getOrCreate(spark_conf)
      # Creating Streaming Context with batch window size of 1 second
      ssc = StreamingContext(sc, 1)
      return ssc

### Overview

The data we use in this assignment was collected from the sensors installed on a wall-navigating robot. The robot uses 24 ultrasound sensors arranged circularly around its "waist". The numbering of the ultrasound sensors starts at the front of the robot and increases in clockwise direction. To make our data streaming scenario realistic, we have developed a server that streams the robot's data to your program (as if you are really getting the data live from the robot). You will use Spark Streaming to perform a few simple tasks on this data.

Every line of data transmitted by the server corresponds to a measurement done by the robot. Here is one line of such data:

```
0.438,0.498,3.625,3.645,5.000,2.918,5.000,2.351,2.332,2.643,1.698,1.687,1.698,1.717,1.744,0.593,0.502,0.493,0.504,0.445,0.431,0.444,0.440,0.429,Slight-Right-Turn
```
The raw values are the measurements of all 24 ultrasound sensors and the corresponding movement type which can be one of the following:
Move-Forward, Slight-Right-Turn, Sharp-Right-Turn, and Slight-Left-Turn.

Run the following block to see the flow of data for 5 seconds. This is normal that in each run the data is slightly shifted in time because it depends on the delay of receiving the data from the server across the Internet.Therefore, in every 1 second batch, we might have different numbers of measurements and it can vary across different runs.

In [None]:
# Let's create ssc.
ssc = initStreamingContext()
# We initialize a DStream by connecting it to a TCP socket. 
# The server will start sending data which goes to the robotData DStream.
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)
robotData.pprint()
ssc.start()
# Just wait 5 seconds before we stop the stream.
time.sleep(5)
ssc.stop()


### Question 1 (4/10 marks)
An important factor for a navigating robot is avoiding obstacles. This is why there are so many sensors on this robot to measure the distance to all surrounding obstacles in all directions. Write a program that every second reports the smallest distances measured in the last 3 seconds by any sensor.


For example, if the robot performs the following two measurements in the last 3 seconds:
```
0.482,0.512,0.524,3.665,2.953,2.940,2.940,2.629,1.709,2.311,1.660,1.640,1.635,1.654,1.755,0.563,0.545,0.475,0.475,0.485,0.464,0.459,0.468,0.478,Slight-Right-Turn
0.484,0.514,0.525,3.667,2.954,2.938,2.941,2.957,1.707,2.310,1.658,1.638,1.633,1.652,1.753,0.682,0.535,0.475,0.475,0.544,0.465,0.457,0.469,0.483,Slight-Right-Turn
```
your program must print:
```
-------------------------------------------
Time: 2020-11-27 23:56:24
-------------------------------------------
0.457
```
Note that this is the output for one 3-second window. The program should keep printing the smallest distance for all windows as long as your program is running.

All of the calculations must be performed in Spark and not the driver program. You must use `pprint` at the end to print the results.

You can consult [this document](https://spark.apache.org/docs/2.2.0/streaming-programming-guide.html) to find more about available transformations on DStreams.




In [None]:
ssc = initStreamingContext()
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)
#### Your Solution to Question 1 here


ssc.start()
# Let's wait for 10 seconds before we stop the program.
# Feel free to change this value but make sure you change it back to 10 before submission.
time.sleep(10)
ssc.stop()

### Question 2 (6/10 marks)
In this question, you characterize movements of the robot. The last field in every line indicates the movement type. Write a program that every second reports what movements were performed by the robot in the last 3 seconds. You should also report the ratio of each movement. For example, if 10 movements are "Slight-Right-Turn" out of 50 movements in the last 3 seconds, your program should print: Slight-Right-Turn 0.2. Finally, the movements should be reported in the descending order of the radios. Make sure you print "----------" to indicate the end of each window.


Here is an example of the expected output:
```
Slight-Right-Turn 0.6666666666666666
Sharp-Right-Turn 0.3333333333333333
----------
Sharp-Right-Turn 0.5384615384615384
Slight-Right-Turn 0.46153846153846156
----------
Slight-Right-Turn 0.6590909090909091
Sharp-Right-Turn 0.3409090909090909
----------
Slight-Right-Turn 0.75
Sharp-Right-Turn 0.19642857142857142
Move-Forward 0.05357142857142857
----------
```
Note that all of these calculations much be performed in Spark and not the driver program. The driver program should only print the final result.

**Hint**: since this question asks you to print results with a custom format you cannot use pprint(). Instead, prepare the results using some transformations and at the end use forEachRDD() to collect and print the results. Please look up forEachRDD in the API to learn how it works.

In [None]:
ssc = initStreamingContext()
robotData = ssc.socketTextStream("datasci.cs.uwaterloo.ca", 4321)
#### Your Solution to Question 2 here

ssc.start()
# Let's wait for 10 seconds before we stop the program.
# Feel free to change this value but make sure you change it back to 10 before submission.
time.sleep(10)
ssc.stop()

---
Don't forget to save your workbook!   When you are finished and you are ready to submit your assignment, download your notebook file (.ipynb) from the hub to your machine, and then follow the submission instructions in the assignment.