Spark can run in several modes, including YARN client/server, Standalone, Mesos and Local. For this training, we will use local mode. Specifically, you can start the Spark interactive shell by invoking the command below in the terminal to run Spark in the local mode with two threads. Then you will see
Here you can set
--driver-memory according to your local setting. If your setting of driver memory is larger than the VM memory, don't forget to change the VM memory setting first.
In Spark, we call the main entrance of a Spark program the driver and Spark distribute computation to workers to compute. Here in the interactive shell, the Spark shell program is the driver. In above example we set the memory of driver program to 3GB as in local mode driver and worker are together. A driver program can access Spark through a
SparkContext object, which represents a connection to a computing cluster. In the above interactive shell,
SparkContext is already created for you as variable
sc. You can input
sc to see its type.
Resilient Distributed Dataset (RDD) is Spark's core abstraction for working with data. An RDD is simply a fault-tolerant distributed collection of elements. You can imagine RDD as a large array but you cannot access elements randomly but you can apply the same operations to all elements in the array easily. In Spark, all the work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute results. There are two ways to create RDDs: by distributing a collection of objects (e.g., a list or set), or by referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
For the demo purpose, the simplest way to create an RDD is to take an existing collection (e.g. a Scala Array) in your program and pass it to SparkContext's
Once created, the distributed dataset (
distData) can be operated in parallel. For example, we can add up the elements by calling
distData.reduce((a, b) => a + b). You will see more operations on RDD later on.
Parallelizing a collection is useful when you are learning Spark. However, this is not encouraged in production since it requires the entire dataset to be in memory of the driver's machine first. Instead, importing data from external datasets should be employed.
A common way for creating RDDs is loading data from external storage. Below you will learn how to load data from a file system. Assuming you have put some data into HDFS as described in the HDFS Basic section. If not, please do that first.
Here in the above example, each line of the original file will become an element in the
Reading data from a file system, Spark relies on the HDFS library. In above example we assume HDFS is well configured through environmental variables or configuration files so that data is ready in HDFS.
RDDs offer two types of operations: transformations and actions:
Spark treats transformations and actions very differently, so understanding which type of operation you are performing is very important. You can check whether a function is a transformation or an action by looking at its return type: transformations return RDDs, whereas actions return some other data type.
All transformations in Spark are lazy, in that they do not compute the results right away. Instead, they just remember the operations applied to some base dataset (e.g. an Array or a file). The transformations are only computed when an action requires a result to be returned to the driver program. Therefore, the above command of reading in a file has not actually been executed yet. We can force the evaluation of RDDs by calling any actions.
Let's go through some common RDD operations using the healthcare dataset.
Recall that in the file case.csv, each line is a 4-field tuple
(patient-id, event-id, timestamp, value).
In order to know how large is our raw event sequence data, we can count the number of lines in the input file using
count operation, i.e.
count is an action.
You may wonder what the loaded data looks like, you can take a peek at the data. The
take(k) will return the first k elements in the RDD. Spark also provides
collect() which brings all the elements in the RDD back to the driver program. Note that
collect() should only be used when the data is small. Both
collect are actions.
We got the first 5 records in this RDD. However, this is hard to read due to a poor format. We can make it more readable by traversing the array to print each record on its own line.
Note that in above 3 code block examples, the RDD
lines has been computed (i.e. read in from file) 3 times. We can prevent this by calling
lines.cache(), which will cache the RDD in memory to avoid reloading.
map operation in Spark is similar to that of Hadoop. It's a transformation that transforms each item in the RDD into a new item by applying the provided function. Notice this
map will map exactly one element from source to target. For example, suppose we are only interested in knowing IDs of patients, we use
It is also possible to write a more complex, multiple-lines map function. In this case, curly braces should be used in place of parentheses. For example, we can get both
event-id as a tuple at the same time.
As indicated by its name,
filter can transform an RDD to another RDD by keeping only elements that satisfy the filtering condition. For example, we want to count the number of events collected for a particular patient to verify amount of the data from that patient. We can use a
distinct is a transformation that transforms a RDD to another by eliminating duplications. We can use that to count the number of distinct patients. In order to do this, we first extract the patient ID from each line.
We use the
map() function as described above. In this example, we transform each line into the corresponding patient ID by extracting only the first column. We then eliminate duplicate IDs by the
Sometimes, you will need to group the input events according to
patient-id to put everything about each patient together. For example, in order to extract index date for predictive modeling, you may first group input data by patient then handle each patient seperately in parallel. We can see each element in RDD is a (Key, Value) pair
reduceByKey transforms an
RDD[(K, V)] into
RDD[(K, List[V])] (like what groupByKey does) and then apply
reduce function on
List[V] to get final output
RDD[(K, V)]. Please be careful that we intentionally denote
V as return type of
reduce which should be same as input type of the list element. Suppose now we want to calculate the total payment by each patient. A payment record in the dataset is in the form of
(patient-id, PAYMENT, timestamp, value).
payment_events RDD returned by
filter contains those records associated with payment. Each item is then transformed to a key-value pair
(patient-id, payment) with
map. Because each patient can have multiple payments, we need to use
reduceByKey to sum up the payments for each patient. Here in this example,
patient-id will be served as the key, and
payment will be the value to sum up for each patient. The figure below shows the process of
reduceByKey in our example
We can then find the top-3 patients with the highest payment by using
and output is
sortBy we use the
_ placeholder, so that
_._2 is an anonymous function that returns the second element of a tuple, which is the total payment a patient. The second parameter of
sortBy controls the order of sorting. In above example,
false means decreasing order.
reduceByKey(math.max) is the simplified expression of
reduceByKey((a,b) => math.max(a,b)).
math.max is a function in scala that turns the larger one of two parameters.
Now we have total payment information of patients, we can run some basic statistics. For RDD consists of numeric values, Spark provides some useful statistical primitives.
RDDs support many of the set operations, such as
intersection, even when the RDDs themselves are not properly sets. For example, we can combine the two files by the
union function. Please notice that
union here is not strictly identical to union operation in mathematics as Spark will not remove duplications.
Here, a more straightforward way is to use directory name to read in multiple files of that directory into a single RDD.
For the complete list of RDD operations, please see the Spark Programming Guide.