Assignment 7: Spark Streaming due 4:00 pm December 7

In this assignment, you'll be playing with Spark Streaming. Unlike the previous assignments that involve a substantial amount of implementation, the goal of this assignment is to give you some exposure to Spark Streaming without getting into too much detail. In other words, this assignment is easier and less time-consuming than previous assignments, by design.

We'll be working with a dataset released by the New York City Taxi & Limousine Commission that captures over one billion individual taxi trips over the past several years. This assignment is inspired by Todd Schneider's very nice blog post titled Analyzing 1.1 Billion NYC Taxi and Uber Trips, with a Vengeance (worth a read!). You can find the raw data here. The dataset is historic, but we're going to treat it as a data stream and perform simple analyses using Spark Streaming.

Since the dataset is 100s of GB and too large for an assignment, we're going to work with a small one-day slice, but that should be sufficient to give you a flavor of what Spark Streaming is like. The slice of the dataset is stored in the bespin-data repo. Go ahead and grab the data.

For this assignment, since the dataset is small, everything can be done in the Linux Student CS environment. That is, there is no Datasci cluster component for this assignment. We'll be marking your assignment in the Linux Student CS environment, and you can assume that we will supply the correct path to this dataset.

Here's what the dataset looks like:

$ head -1 taxi-data/part-2015-12-01-0001.csv 
green,2,2015-12-01 00:01:03,2015-12-01 00:01:29,N,1,-73.937652587890625,40.804546356201172,-73.940483093261719,40.805999755859375,1,.06,2.5,0.5,0.5,0,0,,0.3,3.8,2,1

There are two types of taxi ride records, Yellow and Green.

The schema for yellow taxi rides is as follows:

type,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount

The schema for green taxi rides is as follows:

type,VendorID,lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,improvement_surcharge,Total_amount,Payment_type,Trip_type 

Each part file contains one minute worth of trips, so for the entire day there are 1440 part files. Each one of these becomes a part of a discretized stream to Spark streaming.

Let's start with a very simple query that aggregates the number of trips by hour. We've implemented this for you in Bespin in EventCount. Here's how you run it:

spark-submit --class io.bespin.scala.spark.streaming.EventCount \
 target/bespin-1.0.5-SNAPSHOT-fatjar.jar --input taxi-data --checkpoint checkpoint --output output

Before running the Spark streaming job, run the following first, because otherwise you may get "CPU time limit exceeded" errors:

ulimit -t unlimited

The Spark Streaming query itself is simple:

    val wc = stream.map(_.split(","))
      .map(tuple => ("all", 1))
      .reduceByKeyAndWindow(
        (x: Int, y: Int) => x + y, (x: Int, y: Int) => x - y, Minutes(60), Minutes(60))
      .persist()

Your first task is to understand exactly what's going on the above snippet of code: consult the Spark Streaming documentation, or alternatively, there are plenty of resources online — a simple search will turn up lots of articles and guides.

Beyond the query itself, the implementation goes through a bunch of contortions to be able to mock a stream using data from files. There are a whole host of details, including how to mock a clock so we can simulate the passage of time. The implementation is based on this blog post if you are interested in the details.

The output of each aggregation window is stored in a directory named output-XXXXX where XXXXX is the timestamp. The following command will gather all the results together:

$ find output-* -name "part*" | xargs grep 'all' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n
3600000:(all,7396)
7200000:(all,5780)
10800000:(all,3605)
14400000:(all,2426)
18000000:(all,2505)
21600000:(all,3858)
25200000:(all,10258)
28800000:(all,19007)
32400000:(all,23799)
36000000:(all,24003)
39600000:(all,21179)
43200000:(all,20219)
46800000:(all,20522)
50400000:(all,20556)
54000000:(all,21712)
57600000:(all,22016)
61200000:(all,18034)
64800000:(all,19719)
68400000:(all,25563)
72000000:(all,28178)
75600000:(all,27449)
79200000:(all,27072)
82800000:(all,24078)
86400000:(all,18806)

There are 24 aggregation windows above, one for each hour.

Problem 1: Copy EventCount from Bespin into your assignment repo under the package ca.uwaterloo.cs451.a7. You should now be able to run the following and obtain exactly the same results as above:

spark-submit --class ca.uwaterloo.cs451.a7.EventCount \
 target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output

Problem 2: Create a query called RegionEventCount that counts the number of taxi trips each hour that drop off at either the Goldman Sachs headquarters or the Citigroup headquarters. See the Todd Schneider blog post for context: you're replicating one a simplified version of one of his analyses. Use these coordinates for the bounding box of interest:

goldman = [[-74.0141012, 40.7152191], [-74.013777, 40.7152275], [-74.0141027, 40.7138745], [-74.0144185, 40.7140753]]
citigroup = [[-74.011869, 40.7217236], [-74.009867, 40.721493], [-74.010140,40.720053], [-74.012083, 40.720267]]
The bounding box of a set of points is the smallest rectangle that encompasses all these points as illustrated in the following figure. It is NOT the polygon created by connecting the points:

Bounding box example

To be more precise, you are filtering for taxi trips whose drop off locations are located within (not including) this bounding box and aggregating by hour. This means that we should be able to run the following job:

spark-submit --class ca.uwaterloo.cs451.a7.RegionEventCount \
 target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output

And the following command should gather the answers:

$ find output-* -name "part*" | xargs grep 'goldman' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n
21600000:(goldman,?)
25200000:(goldman,?)
28800000:(goldman,?)
...

$ find output-* -name "part*" | xargs grep 'citigroup' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n
3600000:(citigroup,?)
7200000:(citigroup,?)
10800000:(citigroup,?)
...

Use exactly "goldman" and "citigroup" for the names of the keys. The actual counts should appear in place of the "?" above.

As a hint, RegionEventCount requires minimal modifications over EventCount; basically, add a filter, and that's it.

Problem 3: Let's build a simple "trend detector" to find out when there are lots of arrivals at either Goldman Sachs or Citigroup headquarters, defined in terms of the bounding boxes, exactly as above. We'll consider intervals of ten minutes, i.e., 6:00 to 6:10, 6:10 to 6:20, etc. The trend detector should "go off" when there are at least twice as many arrivals in the current interval as there are in the past interval. To reduce "spurious" detections, we want to make sure the detector only "trips" if there are ten or more arrivals in the current interval. That is, if there are two arrivals in the last ten minute interval and four arrivals in the current ten minute interval, that's not particularly interesting (although the number of arrival has indeed doubled), so we want to suppress such results.

Call this program TrendingArrivals. We'll run it as follows:

spark-submit --class ca.uwaterloo.cs451.a7.TrendingArrivals \
 target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output &> output.log

For simplicity, the detector should output its results to stdout in the form of the following:

Number of arrivals to Goldman Sachs has doubled from X to Y at Z!

or

Number of arrivals to Citigroup has doubled from X to Y at Z!

Where X and Y are the number of arrivals in the previous and current interval, and Z is the timestamp of the current interval. These timestamps are exactly the same form as above, e.g., 21600000.

In other words, when we're marking, we'll be grepping output.log for the phrase "Number of arrivals to Goldman Sachs" and "Number of arrivals to Citigroup", so please make sure that the output is in the format we expect.

Also, the program should output the status for each batch to the directory specified by the output argument. Each status is stored in a separate file with the name of the format part-${timestamp} where timestamp is a 8-digit string padded with leading zeros. The following command should gather the answers:

$ cat output/part-*/* | grep "(citigroup" 
(citigroup,(${Current value},${Current timestamp},${Previous value}))
...

$ cat output/part-*/* | grep "(goldman" 
(goldman,(${Current value},${Current timestamp},${Previous value}))
...

As a hint, TrendingArrivals is a simple modification to RegionEventCount. You'll need to change the window definition (from hourly to 10 minutes) and learn how to use mapWithState, but that's basically it.

Turning in the Assignment

Please follow these instructions carefully!

Your implementations should go in package ca.uwaterloo.cs451.a7. Make sure your implementation runs in the Linux Student CS environment. The following check script is provided for you:

When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.

That's it!

Grading

The entire assignment is worth 30 points:

Reference Running Times

To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.

Class name Running time Linux
EventCount 3.5 minutes
RegionalEventCount 4 minutes
TrendingArrivals 4.5 minutes

Back to top