In this section, we will show how to prepare suitable data for building predictive models to predict heart failure (HF). We will first briefly introduce data types involved. Then we show how to construct training/testing samples from the input data using Spark. Finally we will export data in suitable format for modeling later.
For many machine learning tasks, such as classification, regression, and clustering, a data point is often represented as a feature vector. Each coordinate of the vector corresponds to a particular feature of the data point.
MLlib, the machine learning module of Spark, supports two types of vectors: dense and sparse.
A dense vector is basically a
Double array of length equals to the dimension of the vector.
If a vector contains only a few non-zero entries, we can then more efficiently represent the vector by a sparse vector with non-zero indices and the corresponding values only.
For example, a vector
(1.0, 0.0, 3.0) can be represented in dense format as
[1.0, 0.0, 3.0] or in sparse format as
(3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
The base class of a vector is
Vector, and there are two implementations:
SparseVector. We recommend using the factory methods implemented in
Vectors to create vectors.
A labeled point is a vector, either dense or sparse, associated with a label/prediction target. In Spark MLlib, labeled points are used as input to supervised learning algorithms. For example, in binary classification like HF prediction, a label should be either 0 or 1. For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, .... For regression problem like payment prediction, a label is a real-valued number.
To apply machine learning algorithms, we need to transform our data into
RDD[LabeledPoint]. This feature construction is similar to what we did in Hadoop Pig, but will be more concise since we are programming in Scala on Spark. We will need to consider an one-year prediction window. Specifically, we will only use data one year before HF diagnosis. The figure below depicts relationship between prediction window and target.
We can also specify an observation window, inside which data will be used to construct feature vectors.
High level steps are depicted as the figure below
Our parallelization will be on patient level, i.e. each element in RDD is everything about exactly one patient. Feature and prediction target for each patient is almost independent from the others. Recall that our data file is in the following form:
Each line is a 4-tuple
(patient-id, event-id, timestamp, value). Suppose now our goal is to predict if a patient will have heart failure. We can use the value associated with the event
heartfailure as the label. This value can be either 1.0 (the patient has heart failure) or 0.0 (the patient does not have heart failure). We call a patient with heart failure a positive example or case patient, and a patient without heart failure a negative example or control patient.
For example, in the above snippet we can see that patient
00013D2EFD8E45D1 is a positive example. The file case.csv consists of only positive examples, and the file control.csv consists of only negative examples.
We will use the values associated with events other than
heartfailure to construct feature vector for each patient. Specifically, the length of the feature vector is the number of distinct
event-id's, and each coordinate of the vector stores the aggregated value corresponds to a particular
event-id. The values associated with events not shown in the file are assume to be 0. Since each patient typically has only a few hundreds of records (lines) compared to thousands of distinct events, it is more efficient to use
Note that each patient can have multiple records with the same
event-id. In this case we sum up the values associated with a same
event-id as feature value and use
event-id as feature name.
The file input/case.csv consists of only positive examples, and the file input/control.csv consists of only negative examples. We will load them together. Since the data will be used more than once, we use
cache() to prevent reading in the file multiple times.
One patient's index date, prediction target etc are independent from another patient, so that we can group by patient-id to put everything about one patient together. When we run
map operation, Spark will help us parallelize computation on patient level.
groupBy operation can be illustrated with the example below
Please recall that
_._2 will return second field of a tuple. In this case it will return the
List[event] for a given patient. Finally the
grpPatients will be
Now, we can practice our patient level parallelization. For each patient, we first find the prediction target, which is encoded into an event with name
heartfailure, then we identify the index date and keep only useful events before the index date for feature construction. In feature construction, we aggregate the event value into features using
sum function and use the event name as the feature name.
The construction of target is relatively simple, but the process of constructing features is tricky. The example below show what happened in main body of above
map operation to illustrate how
features were constructed
filteredFeatureEvents should be
RDD[(target, Map[feature-name, feature-value])] and we can verify
that by the following:
In the previous step, we computed
RDD[(target, Map[feature-name, feature-value])]. In order to convert
feature-name to some integer id as required by most machine learning modules including MLlib, we will need to collect all unique feauture names and associate them with integer ids.
Here we used an operation named
flatMap. Below is an example, and we can think of
flatMap as a two step operation, map and flatten. As a result,
patientTargetAndFeatures.flatMap(_._2.keys) will give
Next we visualize the steps after
collect is not depicted but what
collect does is to collect data from distributed to centralized storage on the driver. Here we assume the resulting data matrix is not too big. If the data matrix is very big, alternative approach may be required such as join.
Note that many the common functions like
zipWithIndex have the same name on
RDD and on common local data structures like
If you get confused about result of certain operations, you can avoid chain of operation calls and instead print out the result of each step.
In this last step, we transform
(target, features) for each patient into
LabeledPoint. Basically, we just need to translate feature name in
features into feautre id and create a feature vector then associate the vector with
Here in above example, we called
sc.broadcast. As indicated by its name, this function is used for broadcasting data from driver to workers so that workers will not need to copy on demand and waste bandwidth thus slow down the process. Its usage is very simple, call
val broadcasted = sc.broadcast(object) and use
broadcasted.value to access original
object. Please be aware of the fact that such broadcasted object is read-only.
With data readily available as
RDD[LabeledPoint], we can save it into a common format accepted by a lot of machine learning modules, the LibSVM/svmlight format, named after LibSVM/svmlight package.
You can achieve this by