Assignment 6: SQL Data Analytics due 4:00 pm Nov 27

In this assignment you'll be hand-crafting Spark programs that implement SQL queries in a data warehousing scenario. Various SQL-on-Hadoop solutions share in providing an SQL query interface on data stored in HDFS via an intermediate execution framework. For example, Hive queries are "compiled" into MapReduce jobs; SparkSQL queries rely on Spark processing primitives for query execution. In this assignment, you'll play the role of mediating between SQL queries and the underlying execution framework (Spark). In more detail, you'll be given a series of SQL queries, and for each you'll have to hand-craft a Spark program that corresponds to each query.

Important: You are not allowed to use the DataFrame API or Spark SQL to complete this assignment (with the exception of loading Parquet files, see "hints" below). You must write code to manipulate raw RDDs. Furthermore, you are not allowed to use join and closely-related transformations in Spark for this assignment, because otherwise it defeats the point of the exercise. The assignment will guide you toward what we are looking for, but if you have any questions as to what is allowed or not, ask!

We will be working with data from the TPC-H benchmark in this assignment. The Transaction Processing Performance Council (TPC) is a non-profit organization that defines various database benchmarks so that database vendors can evaluate the performance of their products fairly. TPC defines the "rules of the game", so to speak. TPC defines various benchmarks, of which one is TPC-H, for evaluating ad-hoc decision support systems in a data warehousing scenario. The current version of the TPC-H benchmark is located here. You'll want to skim through this (long) document; the most important part is the entity-relationship diagram of the data warehouse on page 13. Throughout the assignment you'll likely be referring to it, as it will help you make sense of the queries you are running.

The TPC-H benchmark comes with a data generator, and we have generated some data for you:

Download and unpack the datasets above. In the first part of the assignment where you will be working with Spark locally, you will run your queries against both datasets.

For the plain-text version of the data, you will see a number of text files, each corresponding to a table in the TPC-H schema. The files are delimited by |. You'll notice that some of the fields, especially the text fields, are gibberish—that's normal, since the contents are randomly generated.

The Parquet data has the same content, but is encoded in Parquet.

Implement the following seven SQL queries, running on the TPC-H-0.1-TXT plain-text data as well as the TPC-H-0.1-PARQUET Parquet data. For each query you will write a program that takes the option --text to work with the plain-text data and --parquetto work with the Parquet data. More details will be provided below.

Each SQL query is accompanied by a written description of what the query does: if there are any ambiguities in the language, you can always assume that the SQL query is correct. Your implementation of each query will be a separate Spark program. Put your code in the package ca.uwaterloo.cs451.a6, in the same repo that you've been working in all semester. Since you'll be writing Scala code, your source files should go into src/main/scala/ca/uwaterloo/cs451/a6/.

Q1: How many items were shipped on a particular date? This corresponds to the following SQL query:

select count(*) from lineitem where l_shipdate = 'YYYY-MM-DD';

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q1 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), in a line that matches the following regular expression:

ANSWER=\d+

The output of the query can contain logging and debug information, but there must be a line with the answer in exactly the above format.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q1 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

In both cases, the value of the --input argument is the directory that contains the data (either in plain text or in Parquet). The value of the --date argument corresponds to the l_shipdate predicate in the where clause in the SQL query. You can assume that a valid date is provided, so you do not need to perform input validation.

Q2: Which clerks were responsible for processing items that were shipped on a particular date? List the first 20 by order key. This corresponds to the following SQL query:

select o_clerk, o_orderkey from lineitem, orders
where
  l_orderkey = o_orderkey and
  l_shipdate = 'YYYY-MM-DD'
order by o_orderkey asc limit 20;

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q2 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), as a sequence of tuples in the following format:

(o_clerk,o_orderkey)
(o_clerk,o_orderkey)
...

That is, each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q2 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

In the design of this data warehouse, the lineitem and orders tables are not likely to fit in memory. Therefore, the only scalable join approach is the reduce-side join. You must implement this join in Spark using the cogroup transformation.

Q3: What are the names of parts and suppliers of items shipped on a particular date? List the first 20 by order key. This corresponds to the following SQL query:

select l_orderkey, p_name, s_name from lineitem, part, supplier
where
  l_partkey = p_partkey and
  l_suppkey = s_suppkey and
  l_shipdate = 'YYYY-MM-DD'
order by l_orderkey asc limit 20;

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q3 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), as a sequence of tuples in the following format:

(l_orderkey,p_name,s_name)
(l_orderkey,p_name,s_name)
...

That is, each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q3 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

In the design of this data warehouse, it is assumed that the part and supplier tables will fit in memory. Therefore, it is possible to implement a hash join. For this query, you must implement a hash join in Spark with broadcast variables.

Q4: How many items were shipped to each country on a particular date? This corresponds to the following SQL query:

select n_nationkey, n_name, count(*) from lineitem, orders, customer, nation
where
  l_orderkey = o_orderkey and
  o_custkey = c_custkey and
  c_nationkey = n_nationkey and
  l_shipdate = 'YYYY-MM-DD'
group by n_nationkey, n_name
order by n_nationkey asc;

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q4 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q4 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

Implement this query with different join techniques as you see fit. You can assume that the lineitem and orders table will not fit in memory, but you can assume that the customer and nation tables will both fit in memory. For this query, the performance as well as the scalability of your implementation will contribute to the grade.

Q5: This query represents a very simple end-to-end ad hoc analysis task: Related to Q4, your boss has asked you to compare shipments to Canada vs. the United States by month, given all the data in the data warehouse. You think this request is best fulfilled by a line graph, with two lines (one representing the US and one representing Canada), where the x-axis is the year/month and the y axis is the volume, i.e., count(*). Generate this graph for your boss.

First, write a program such that when we execute the following command (on plain text):

spark-submit --class ca.uwaterloo.cs451.a6.Q5 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --text

the raw data necessary for the graph will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses.

Here is a sample output

(3,CANADA,1992-01,3874)
(3,CANADA,1992-02,10694)
(3,CANADA,1992-03,19007)
(3,CANADA,1992-04,25707)
...

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q5 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --parquet

Next, create this actual graph: use whatever tool you are comfortable with, e.g., Excel, gnuplot, etc.

Q6: This is a slightly modified version of TPC-H Q1 "Pricing Summary Report Query". This query reports the amount of business that was billed, shipped, and returned:

select
  l_returnflag,
  l_linestatus,
  sum(l_quantity) as sum_qty,
  sum(l_extendedprice) as sum_base_price,
  sum(l_extendedprice*(1-l_discount)) as sum_disc_price,
  sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge,
  avg(l_quantity) as avg_qty,
  avg(l_extendedprice) as avg_price,
  avg(l_discount) as avg_disc,
  count(*) as count_order
from lineitem
where
  l_shipdate = 'YYYY-MM-DD'
group by l_returnflag, l_linestatus;

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q6 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q6 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

Implement this query as efficiently as you can, using all of the optimizations we discussed in lecture. You will only get full points for this question if you exploit all the optimization opportunities that are available.

Q7: This is a slightly modified version of TPC-H Q3 "Shipping Priority Query". This query retrieves the 5 unshipped orders with the highest value:

select
  c_name,
  l_orderkey,
  sum(l_extendedprice*(1-l_discount)) as revenue,
  o_orderdate,
  o_shippriority
from customer, orders, lineitem
where
  c_custkey = o_custkey and
  l_orderkey = o_orderkey and
  o_orderdate < "YYYY-MM-DD" and
  l_shipdate > "YYYY-MM-DD"
group by
  c_name,
  l_orderkey,
  o_orderdate,
  o_shippriority
order by
  revenue desc
limit 5;

Write a program such that when we execute the following command (on the plain-text data):

spark-submit --class ca.uwaterloo.cs451.a6.Q7 \
  target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text

the answer to the above SQL query will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses. Here you can assume that the date argument is only in the format YYYY-MM-DD and that it is a valid date.

And the Parquet version:

spark-submit --class ca.uwaterloo.cs451.a6.Q7 \
  target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet

Implement this query as efficiently as you can, using all of the optimizations we discussed in lecture. Plan joins as you see fit, keeping in mind above assumptions on what will and will not fit in memory. You will only get full points for this question if you exploit all the optimization opportunities that are available.

Scaling up on the Datasci Cluster

Once you get your implementation working and debugged in the Linux environment, run your code on a larger TCP-H dataset, located on HDFS:

Make sure that all seven queries above run correctly on this larger dataset. For your reference, the the command-line parameters for Q1-Q7 are provided below (so you can copy and paste).

Important: Note that for this assignment for spark-submit we set --deploy-mode client. This will force the driver to run on the client (i.e., workspace), so that you will see the output of println. Otherwise, the driver will run on an arbitrary cluster node, making stdout not directly visible. Also, we redirect stdout to a file so that it'll be easier to view the output.

Q1:

spark-submit --class ca.uwaterloo.cs451.a6.Q1 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q1t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q1 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q1p.out

Q2:

spark-submit --class ca.uwaterloo.cs451.a6.Q2 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q2t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q2 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q2p.out

Q3:

spark-submit --class ca.uwaterloo.cs451.a6.Q3 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q3t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q3 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q3p.out

Q4:

spark-submit --class ca.uwaterloo.cs451.a6.Q4 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q4t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q4 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q4p.out

Q5:

spark-submit --class ca.uwaterloo.cs451.a6.Q5 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT --text > q5t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q5 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET --parquet > q5p.out

Q6:

spark-submit --class ca.uwaterloo.cs451.a6.Q6 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q6t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q6 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q6p.out

Q7:

spark-submit --class ca.uwaterloo.cs451.a6.Q7 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \
 --date '1996-01-01' --text > q7t.out

spark-submit --class ca.uwaterloo.cs451.a6.Q7 --deploy-mode client \
 --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \
 target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \
 --date '1996-01-01' --parquet > q7p.out

In this configuration, your programs shouldn't take more than a couple of minutes. If it's taking more than five minutes, you're probably doing something wrong.

Turning in the Assignment

Please follow these instructions carefully!

Make sure your repo has the following items:

Make sure your implementation runs in the Linux student CS environment on TPC-H-0.1-TXT, and on the Datasci cluster on the TPC-H-10-TXT data.

Specifically, we will clone your repo and use the below check scripts:

When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.

That's it!

Hints

The easiest way to read Parquet is to use the DataFrame API, as follows:

import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder.getOrCreate

val lineitemDF = sparkSession.read.parquet("TPC-H-0.1-PARQUET/lineitem")
val lineitemRDD = lineitemDF.rdd

Once you read in the table as as a DataFrame, convert it into an RDD and work from there.

You are not allowed to use Spark SQL in your implementations for this assignment, but I have no problem if you use Spark SQL to check your answers.

The expected answers for Q6 and Q7 are as follows:

Q6-parquet-linux
(N,O,7050.0,9670322.450000001,9158295.638400001,9535800.642212,26.50375939849624,36354.59567669174,0.05274436090225563,266)

Q6-text-linux
(N,O,7050.0,9670322.449999996,9158295.638400003,9535800.642212,26.50375939849624,36354.595676691715,0.052744360902255615,266)

Q7-parquet-linux
(Customer#000009068,504452,427070.9956,1995-12-28,0)
(Customer#000001013,208450,392056.5682,1995-11-23,0)
(Customer#000014521,5925,391698.6805,1995-11-13,0)
(Customer#000006160,145089,383905.14719999995,1995-12-06,0)
(Customer#000002149,92039,372060.5709,1995-12-24,0)

Q7-text-linux
(Customer#000009068,504452,427070.9956,1995-12-28,0)
(Customer#000001013,208450,392056.5682,1995-11-23,0)
(Customer#000014521,5925,391698.6805,1995-11-13,0)
(Customer#000006160,145089,383905.14719999995,1995-12-06,0)
(Customer#000002149,92039,372060.5709,1995-12-24,0)

Grading

The entire assignment is worth 100 points:

A working implementation means that your code gives the right output according to our private check scripts, which will contain --date parameters that are unknown to you (but will nevertheless conform to our specifications above).

Reference Running Times

To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.

Class name Running time Linux Running time Datasci
Q1 7 seconds 40 seconds
Q2 8 seconds 50 seconds
Q3 7 seconds 60 seconds
Q4 8 seconds 55 seconds
Q5 9 seconds 65 seconds
Q6 7 seconds 41 seconds
Q7 9 seconds 63 seconds

Back to top