In this assignment, you'll be playing with Spark Streaming. Unlike the previous assignments that involve a substantial amount of implementation, the goal of this assignment is to give you some exposure to Spark Streaming without getting into too much detail. In other words, this assignment is easier and less time-consuming than previous assignments, by design.
We'll be working with a dataset released by the New York City Taxi & Limousine Commission that captures over one billion individual taxi trips over the past several years. This assignment is inspired by Todd Schneider's very nice blog post titled Analyzing 1.1 Billion NYC Taxi and Uber Trips, with a Vengeance (worth a read!). You can find the raw data here. The dataset is historic, but we're going to treat it as a data stream and perform simple analyses using Spark Streaming.
Since the dataset is 100s of GB and too large for an assignment,
we're going to work with a small one-day slice, but that should be
sufficient to give you a flavor of what Spark Streaming is like. The
slice of the dataset is stored in
the bespin-data
repo. Go ahead and grab the data.
For this assignment, since the dataset is small, everything can be done in the Linux Student CS environment. That is, there is no Datasci cluster component for this assignment. We'll be marking your assignment in the Linux Student CS environment, and you can assume that we will supply the correct path to this dataset.
Here's what the dataset looks like:
$ head -1 taxi-data/part-2015-12-01-0001.csv green,2,2015-12-01 00:01:03,2015-12-01 00:01:29,N,1,-73.937652587890625,40.804546356201172,-73.940483093261719,40.805999755859375,1,.06,2.5,0.5,0.5,0,0,,0.3,3.8,2,1
There are two types of taxi ride records, Yellow and Green.
The schema for yellow taxi rides is as follows:
type,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
The schema for green taxi rides is as follows:
type,VendorID,lpep_pickup_datetime,Lpep_dropoff_datetime,Store_and_fwd_flag,RateCodeID,Pickup_longitude,Pickup_latitude,Dropoff_longitude,Dropoff_latitude,Passenger_count,Trip_distance,Fare_amount,Extra,MTA_tax,Tip_amount,Tolls_amount,Ehail_fee,improvement_surcharge,Total_amount,Payment_type,Trip_type
Each part file contains one minute worth of trips, so for the entire day there are 1440 part files. Each one of these becomes a part of a discretized stream to Spark streaming.
Let's start with a very simple query that aggregates the number of
trips by hour. We've implemented this for you in Bespin
in EventCount
.
Here's how you run it:
spark-submit --class io.bespin.scala.spark.streaming.EventCount \ target/bespin-1.0.5-SNAPSHOT-fatjar.jar --input taxi-data --checkpoint checkpoint --output output
Before running the Spark streaming job, run the following first, because otherwise you may get "CPU time limit exceeded" errors:
ulimit -t unlimited
The Spark Streaming query itself is simple:
val wc = stream.map(_.split(",")) .map(tuple => ("all", 1)) .reduceByKeyAndWindow( (x: Int, y: Int) => x + y, (x: Int, y: Int) => x - y, Minutes(60), Minutes(60)) .persist()
Your first task is to understand exactly what's going on the above snippet of code: consult the Spark Streaming documentation, or alternatively, there are plenty of resources online — a simple search will turn up lots of articles and guides.
Beyond the query itself, the implementation goes through a bunch of contortions to be able to mock a stream using data from files. There are a whole host of details, including how to mock a clock so we can simulate the passage of time. The implementation is based on this blog post if you are interested in the details.
The output of each aggregation window is stored in a directory
named output-XXXXX
where XXXXX
is the
timestamp. The following command will gather all the results
together:
$ find output-* -name "part*" | xargs grep 'all' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n 3600000:(all,7396) 7200000:(all,5780) 10800000:(all,3605) 14400000:(all,2426) 18000000:(all,2505) 21600000:(all,3858) 25200000:(all,10258) 28800000:(all,19007) 32400000:(all,23799) 36000000:(all,24003) 39600000:(all,21179) 43200000:(all,20219) 46800000:(all,20522) 50400000:(all,20556) 54000000:(all,21712) 57600000:(all,22016) 61200000:(all,18034) 64800000:(all,19719) 68400000:(all,25563) 72000000:(all,28178) 75600000:(all,27449) 79200000:(all,27072) 82800000:(all,24078) 86400000:(all,18806)
There are 24 aggregation windows above, one for each hour.
Problem 1: Copy EventCount
from Bespin into
your assignment repo under the
package ca.uwaterloo.cs451.a7
. You should now be able to
run the following and obtain exactly the same results as above:
spark-submit --class ca.uwaterloo.cs451.a7.EventCount \ target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output
Problem 2: Create a query
called RegionEventCount
that counts the number of taxi
trips each hour that drop off at either the Goldman Sachs headquarters
or the Citigroup headquarters. See the
Todd
Schneider blog post for context: you're replicating one a
simplified version of one of his analyses. Use these coordinates for
the bounding box of interest:
goldman = [[-74.0141012, 40.7152191], [-74.013777, 40.7152275], [-74.0141027, 40.7138745], [-74.0144185, 40.7140753]] citigroup = [[-74.011869, 40.7217236], [-74.009867, 40.721493], [-74.010140,40.720053], [-74.012083, 40.720267]]The bounding box of a set of points is the smallest rectangle that encompasses all these points as illustrated in the following figure. It is NOT the polygon created by connecting the points:
To be more precise, you are filtering for taxi trips whose drop off locations are located within (not including) this bounding box and aggregating by hour. This means that we should be able to run the following job:
spark-submit --class ca.uwaterloo.cs451.a7.RegionEventCount \ target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output
And the following command should gather the answers:
$ find output-* -name "part*" | xargs grep 'goldman' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n 21600000:(goldman,?) 25200000:(goldman,?) 28800000:(goldman,?) ... $ find output-* -name "part*" | xargs grep 'citigroup' | sed -E 's/^output-([0-9]+)\/part-[0-9]+/\1/' | sort -n 3600000:(citigroup,?) 7200000:(citigroup,?) 10800000:(citigroup,?) ...
Use exactly "goldman" and "citigroup" for the names of the keys. The actual counts should appear in place of the "?" above.
As a hint, RegionEventCount
requires minimal
modifications over EventCount
; basically, add a filter,
and that's it.
Problem 3: Let's build a simple "trend detector" to find out when there are lots of arrivals at either Goldman Sachs or Citigroup headquarters, defined in terms of the bounding boxes, exactly as above. We'll consider intervals of ten minutes, i.e., 6:00 to 6:10, 6:10 to 6:20, etc. The trend detector should "go off" when there are at least twice as many arrivals in the current interval as there are in the past interval. To reduce "spurious" detections, we want to make sure the detector only "trips" if there are ten or more arrivals in the current interval. That is, if there are two arrivals in the last ten minute interval and four arrivals in the current ten minute interval, that's not particularly interesting (although the number of arrival has indeed doubled), so we want to suppress such results.
Call this program TrendingArrivals
. We'll run it as
follows:
spark-submit --class ca.uwaterloo.cs451.a7.TrendingArrivals \ target/assignments-1.0.jar --input taxi-data --checkpoint checkpoint --output output &> output.log
For simplicity, the detector should output its results
to stdout
in the form of the following:
Number of arrivals to Goldman Sachs has doubled from X to Y at Z!
or
Number of arrivals to Citigroup has doubled from X to Y at Z!
Where X
and Y
are the number of arrivals
in the previous and current interval, and Z
is the
timestamp of the current interval. These timestamps are exactly the
same form as above, e.g., 21600000
.
In other words, when we're marking, we'll be
grepping output.log
for the phrase "Number of arrivals to
Goldman Sachs" and "Number of arrivals to Citigroup", so please make
sure that the output is in the format we expect.
Also, the program should output the status for each batch to the directory specified by the output
argument. Each status is stored in a separate file with the name of the format part-${timestamp}
where timestamp is a 8-digit string padded with leading zeros.
The following command should gather the answers:
$ cat output/part-*/* | grep "(citigroup" (citigroup,(${Current value},${Current timestamp},${Previous value})) ... $ cat output/part-*/* | grep "(goldman" (goldman,(${Current value},${Current timestamp},${Previous value})) ...
As a hint, TrendingArrivals
is a simple modification
to RegionEventCount
. You'll need to change the window
definition (from hourly to 10 minutes) and learn how to
use mapWithState
, but that's basically it.
Please follow these instructions carefully!
Your implementations should go in
package ca.uwaterloo.cs451.a7
. Make sure your
implementation runs in the Linux Student CS environment. The following
check script is provided for you:
When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.
That's it!
The entire assignment is worth 30 points:
To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.
Class name | Running time Linux |
---|---|
EventCount | 3.5 minutes |
RegionalEventCount | 4 minutes |
TrendingArrivals | 4.5 minutes |