In this assignment you'll be hand-crafting Spark programs that implement SQL queries in a data warehousing scenario. Various SQL-on-Hadoop solutions share in providing an SQL query interface on data stored in HDFS via an intermediate execution framework. For example, Hive queries are "compiled" into MapReduce jobs; SparkSQL queries rely on Spark processing primitives for query execution. In this assignment, you'll play the role of mediating between SQL queries and the underlying execution framework (Spark). In more detail, you'll be given a series of SQL queries, and for each you'll have to hand-craft a Spark program that corresponds to each query.
Important: You are not allowed to use the DataFrame API or
Spark SQL to complete this assignment (with the exception of loading Parquet files, see "hints" below).
You must write code to manipulate raw RDDs.
Furthermore, you are not allowed to use join
and
closely-related transformations in Spark for this assignment, because
otherwise it defeats the point of the exercise. The assignment will
guide you toward what we are looking for, but if you have any
questions as to what is allowed or not, ask!
We will be working with data from the TPC-H benchmark in this assignment. The Transaction Processing Performance Council (TPC) is a non-profit organization that defines various database benchmarks so that database vendors can evaluate the performance of their products fairly. TPC defines the "rules of the game", so to speak. TPC defines various benchmarks, of which one is TPC-H, for evaluating ad-hoc decision support systems in a data warehousing scenario. The current version of the TPC-H benchmark is located here. You'll want to skim through this (long) document; the most important part is the entity-relationship diagram of the data warehouse on page 13. Throughout the assignment you'll likely be referring to it, as it will help you make sense of the queries you are running.
The TPC-H benchmark comes with a data generator, and we have generated some data for you:
TPC-H-0.1-TXT.tar.gz
:
the plain-text version of the dataTPC-H-0.1-PARQUET.tar.gz
:
the Parquet version of the dataDownload and unpack the datasets above. In the first part of the assignment where you will be working with Spark locally, you will run your queries against both datasets.
For the plain-text version of the data, you will see a number of
text files, each corresponding to a table in the TPC-H schema. The
files are delimited by |
. You'll notice that some of the
fields, especially the text fields, are gibberish—that's normal,
since the contents are randomly generated.
The Parquet data has the same content, but is encoded in Parquet.
Implement the following seven SQL queries, running on
the TPC-H-0.1-TXT
plain-text data as well as
the TPC-H-0.1-PARQUET
Parquet data. For each query you
will write a program that takes the option --text
to work
with the plain-text data and --parquet
to work with the
Parquet data. More details will be provided below.
Each SQL query is accompanied by a written description of what the
query does: if there are any ambiguities in the language, you can
always assume that the SQL query is correct. Your implementation of each query will
be a separate Spark program. Put your code in the package
ca.uwaterloo.cs451.a6
, in the
same repo that you've been working in all semester. Since you'll be
writing Scala code, your source files should go into
src/main/scala/ca/uwaterloo/cs451/a6/
.
Q1: How many items were shipped on a particular date? This corresponds to the following SQL query:
select count(*) from lineitem where l_shipdate = 'YYYY-MM-DD';
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q1 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), in a line that matches the following regular expression:
ANSWER=\d+
The output of the query can contain logging and debug information, but there must be a line with the answer in exactly the above format.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q1 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
In both cases, the value of the
--input
argument is the directory that contains the
data (either in plain text or in Parquet). The value of the --date
argument
corresponds to the l_shipdate
predicate in the where
clause in the SQL query.
You can assume that a valid date
is provided, so you do not need to
perform input validation.
Q2: Which clerks were responsible for processing items that were shipped on a particular date? List the first 20 by order key. This corresponds to the following SQL query:
select o_clerk, o_orderkey from lineitem, orders where l_orderkey = o_orderkey and l_shipdate = 'YYYY-MM-DD' order by o_orderkey asc limit 20;
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q2 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), as a sequence of tuples in the following format:
(o_clerk,o_orderkey) (o_clerk,o_orderkey) ...
That is, each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q2 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
In the design of this data warehouse, the lineitem
and orders
tables are not likely to fit in
memory. Therefore, the only scalable join approach is the reduce-side
join. You must implement this join in Spark using
the cogroup
transformation.
Q3: What are the names of parts and suppliers of items shipped on a particular date? List the first 20 by order key. This corresponds to the following SQL query:
select l_orderkey, p_name, s_name from lineitem, part, supplier where l_partkey = p_partkey and l_suppkey = s_suppkey and l_shipdate = 'YYYY-MM-DD' order by l_orderkey asc limit 20;
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q3 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the console where the above command is executed), as a sequence of tuples in the following format:
(l_orderkey,p_name,s_name) (l_orderkey,p_name,s_name) ...
That is, each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q3 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
In the design of this data warehouse, it is assumed that
the part
and supplier
tables will fit in
memory. Therefore, it is possible to implement a hash join. For this
query, you must implement a hash join in Spark with broadcast
variables.
Q4: How many items were shipped to each country on a particular date? This corresponds to the following SQL query:
select n_nationkey, n_name, count(*) from lineitem, orders, customer, nation where l_orderkey = o_orderkey and o_custkey = c_custkey and c_nationkey = n_nationkey and l_shipdate = 'YYYY-MM-DD' group by n_nationkey, n_name order by n_nationkey asc;
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q4 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q4 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
Implement this query with different join techniques as you see
fit. You can assume that the lineitem
and orders
table will not fit in memory, but you can
assume that the customer
and nation
tables
will both fit in memory. For this query, the performance as well as
the scalability of your implementation will contribute to the
grade.
Q5: This query represents a very simple end-to-end ad hoc
analysis task: Related to Q4, your boss has asked you to
compare shipments to Canada vs. the United States by month, given all
the data in the data warehouse. You think this request is best
fulfilled by a line graph, with two lines (one representing the US and
one representing Canada), where the x-axis is the year/month and
the y axis is the volume, i.e., count(*)
.
Generate this graph for your boss.
First, write a program such that when we execute the following command (on plain text):
spark-submit --class ca.uwaterloo.cs451.a6.Q5 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --text
the raw data necessary for the graph will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses.
Here is a sample output
(3,CANADA,1992-01,3874) (3,CANADA,1992-02,10694) (3,CANADA,1992-03,19007) (3,CANADA,1992-04,25707) ...
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q5 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --parquet
Next, create this actual graph: use whatever tool you are comfortable with, e.g., Excel, gnuplot, etc.
Q6: This is a slightly modified version of TPC-H Q1 "Pricing Summary Report Query". This query reports the amount of business that was billed, shipped, and returned:
select l_returnflag, l_linestatus, sum(l_quantity) as sum_qty, sum(l_extendedprice) as sum_base_price, sum(l_extendedprice*(1-l_discount)) as sum_disc_price, sum(l_extendedprice*(1-l_discount)*(1+l_tax)) as sum_charge, avg(l_quantity) as avg_qty, avg(l_extendedprice) as avg_price, avg(l_discount) as avg_disc, count(*) as count_order from lineitem where l_shipdate = 'YYYY-MM-DD' group by l_returnflag, l_linestatus;
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q6 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the console where the above command is executed). Format the output in the same manner as with the above queries: one tuple per line, where each tuple is comma-delimited and surrounded by parentheses. Everything described in Q1 about dates applies here as well.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q6 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
Implement this query as efficiently as you can, using all of the optimizations we discussed in lecture. You will only get full points for this question if you exploit all the optimization opportunities that are available.
Q7: This is a slightly modified version of TPC-H Q3 "Shipping Priority Query". This query retrieves the 10 unshipped orders with the highest value:
select c_name, l_orderkey, sum(l_extendedprice*(1-l_discount)) as revenue, o_orderdate, o_shippriority from customer, orders, lineitem where c_custkey = o_custkey and l_orderkey = o_orderkey and o_orderdate < "YYYY-MM-DD" and l_shipdate > "YYYY-MM-DD" group by c_name, l_orderkey, o_orderdate, o_shippriority order by revenue desc limit 10;
Write a program such that when we execute the following command (on the plain-text data):
spark-submit --class ca.uwaterloo.cs451.a6.Q7 \ target/assignments-1.0.jar --input TPC-H-0.1-TXT --date '1996-01-01' --text
the answer to the above SQL query will be printed to stdout (on the
console where the above command is executed).
Format the output in the same manner as with the above queries: one
tuple per line, where each tuple is comma-delimited and surrounded by
parentheses. Here you can assume that the date argument is only in the
format YYYY-MM-DD
and that it is a valid date.
And the Parquet version:
spark-submit --class ca.uwaterloo.cs451.a6.Q7 \ target/assignments-1.0.jar --input TPC-H-0.1-PARQUET --date '1996-01-01' --parquet
Implement this query as efficiently as you can, using all of the optimizations we discussed in lecture. Plan joins as you see fit, keeping in mind above assumptions on what will and will not fit in memory. You will only get full points for this question if you exploit all the optimization opportunities that are available.
Once you get your implementation working and debugged in the Linux environment, run your code on a larger TCP-H dataset, located on HDFS:
/data/cs451/TPC-H-10-TXT
for the plain-text data/data/cs451/TPC-H-10-PARQUET
for the Parquet dataMake sure that all seven queries above run correctly on this larger dataset. For your reference, the the command-line parameters for Q1-Q7 are provided below (so you can copy and paste).
Important: Note that for this assignment
for spark-submit
we set --deploy-mode
client
. This will force the driver to run on the client (i.e.,
workspace), so that you will see the output
of println
. Otherwise, the driver will run on an
arbitrary cluster node, making stdout not directly visible. Also, we
redirect stdout to a file so that it'll be easier to view the
output.
Q1:
spark-submit --class ca.uwaterloo.cs451.a6.Q1 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q1t.out spark-submit --class ca.uwaterloo.cs451.a6.Q1 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q1p.out
Q2:
spark-submit --class ca.uwaterloo.cs451.a6.Q2 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q2t.out spark-submit --class ca.uwaterloo.cs451.a6.Q2 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q2p.out
Q3:
spark-submit --class ca.uwaterloo.cs451.a6.Q3 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q3t.out spark-submit --class ca.uwaterloo.cs451.a6.Q3 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q3p.out
Q4:
spark-submit --class ca.uwaterloo.cs451.a6.Q4 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q4t.out spark-submit --class ca.uwaterloo.cs451.a6.Q4 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q4p.out
Q5:
spark-submit --class ca.uwaterloo.cs451.a6.Q5 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT --text > q5t.out spark-submit --class ca.uwaterloo.cs451.a6.Q5 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET --parquet > q5p.out
Q6:
spark-submit --class ca.uwaterloo.cs451.a6.Q6 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q6t.out spark-submit --class ca.uwaterloo.cs451.a6.Q6 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q6p.out
Q7:
spark-submit --class ca.uwaterloo.cs451.a6.Q7 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-TXT \ --date '1996-01-01' --text > q7t.out spark-submit --class ca.uwaterloo.cs451.a6.Q7 --deploy-mode client \ --num-executors 4 --executor-cores 4 --executor-memory 24G --driver-memory 2g \ target/assignments-1.0.jar --input /data/cs451/TPC-H-10-PARQUET \ --date '1996-01-01' --parquet > q7p.out
In this configuration, your programs shouldn't take more than a couple of minutes. If it's taking more than five minutes, you're probably doing something wrong.
Please follow these instructions carefully!
Make sure your repo has the following items:
bigdata2021f/assignment6.md
.bigdata2021f/assignment6-Q5-small.pdf
and bigdata2021f/assignment6-Q5-large.pdf
that contains
the graphs for Q5 on the TPC-H-0.1-TXT
and TPC-H-10-TXT
datasets, respectively. If you cannot
easily generate PDFs, the files should be some easily-viewable format,
e.g., png
, gif
, etc.ca.uwaterloo.cs451.a6
. There
should be at the minimum seven classes (Q1-Q7), but you may include
helper classes as you see fit.Make sure your implementation runs in the Linux student CS
environment on TPC-H-0.1-TXT
, and on the Datasci
cluster on the TPC-H-10-TXT
data.
Specifically, we will clone your repo and use the below check scripts:
check_assignment6_public_linux.py
in the Linux Student CS environment.check_assignment6_public_datasci.py
on the Datasci cluster.When you've done everything, commit to your repo and remember to push back to origin. You should be able to see your edits in the web interface. Before you consider the assignment "complete", we would recommend that you verify everything above works by performing a clean clone of your repo and run the public check scripts.
That's it!
The easiest way to read Parquet is to use the DataFrame API, as follows:
import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder.getOrCreate val lineitemDF = sparkSession.read.parquet("TPC-H-0.1-PARQUET/lineitem") val lineitemRDD = lineitemDF.rdd
Once you read in the table as as a DataFrame, convert it into an RDD and work from there.
You are not allowed to use Spark SQL in your implementations for this assignment, but I have no problem if you use Spark SQL to check your answers.
The expected answers for Q6 and Q7 are as follows:
Q6-parquet-linux (N,O,7050.0,9670322.450000001,9158295.638400001,9535800.642212,26.50375939849624,36354.59567669174,0.05274436090225563,266) Q6-text-linux (N,O,7050.0,9670322.449999996,9158295.638400003,9535800.642212,26.50375939849624,36354.595676691715,0.052744360902255615,266) Q7-parquet-linux (Customer#000009068,504452,427070.9956,1995-12-28,0) (Customer#000001013,208450,392056.5682,1995-11-23,0) (Customer#000014521,5925,391698.6805,1995-11-13,0) (Customer#000006160,145089,383905.14719999995,1995-12-06,0) (Customer#000002149,92039,372060.5709,1995-12-24,0) (Customer#000012259,315269,336969.9039,1995-12-29,0) (Customer#000005623,176769,324073.6752,1995-12-21,0) (Customer#000003803,381152,323319.2872,1995-11-30,0) (Customer#000011183,399459,321395.2656,1995-12-13,0) (Customer#000005935,539456,320125.29410000006,1995-12-18,0) Q7-text-linux (Customer#000009068,504452,427070.9956,1995-12-28,0) (Customer#000001013,208450,392056.5682,1995-11-23,0) (Customer#000014521,5925,391698.6805,1995-11-13,0) (Customer#000006160,145089,383905.14719999995,1995-12-06,0) (Customer#000002149,92039,372060.5709,1995-12-24,0) (Customer#000012259,315269,336969.9039,1995-12-29,0) (Customer#000005623,176769,324073.6752,1995-12-21,0) (Customer#000003803,381152,323319.2872,1995-11-30,0) (Customer#000011183,399459,321395.2656,1995-12-13,0) (Customer#000005935,539456,320125.29410000006,1995-12-18,0)
The entire assignment is worth 100 points:
A working implementation means that your code gives the right
output according to our private check scripts, which will
contain --date
parameters that are unknown to you (but
will nevertheless conform to our specifications above).
To help you gauge the efficiency of your solution, we are giving you the running times of our reference implementations. Keep in mind that these are machine dependent and can vary depending on the server/cluster load.
Class name | Running time Linux | Running time Datasci |
---|---|---|
Q1 | 7 seconds | 40 seconds |
Q2 | 8 seconds | 50 seconds |
Q3 | 7 seconds | 60 seconds |
Q4 | 8 seconds | 55 seconds |
Q5 | 9 seconds | 65 seconds |
Q6 | 7 seconds | 41 seconds |
Q7 | 9 seconds | 63 seconds |