In the previous section we put the input data into a Hadoop Distributed File System (HDFS). Now, let's learn how to write a distributed computing program using the Hadoop MapReduce paradigm.
MapReduce works by breaking the processing into two phases: map and reduce. Each phase uses key-value pairs as input and output, the types of which can be chosen by the user. An overview of the MapReduce paradigm is shown below.
The input files are split and fed to mappers on different machines. Each mapper processes the corresponding input file "splits" line by line, and outputs the resulting key-value pairs to the local disk. Hadoop then performs a shuffle operation wherein the key-value data output by the map operations is sorted across machines. This is done to collect the key value data and group it by key so that data with the same key is sent to the same reducer. The reducers then combines each group of key-value pairs with the same key into a single key-value pair. Since the shuffle phase is carried out automatically by Hadoop, the user only needs to define the map and reduce operations.
Let's write a simple MapReduce program in Java to calculate the frequency of each event-id in our case.csv file (described in sample data).
A MapReduce program consists of three parts:
A main function that tells Hadoop to use the classes we created.
Create a Java file FrequencyMapper.java. The FrequencyMapper class extends the predefined Mapper class and overwrites the map function.
The map function is illustrated below.
The 4-tuple <LongWritable, Text, Text, IntWritable> specifies that the input key-value pair is of type <LongWritable, Text> and the output key-value type is of type <Text, IntWritable>.
Since the input files are plain text, we use the input key-value pair of type <LongWritable, Text>. The key is the offset of the start of each line, which is not used here. The value is the actual text in the corresponding line.
We use toString() to transform the Hadoop Text object into the more familiar Java String object and extract only the second field of the line (recall that each line is in the form of patient-id, event-id, timestamp, value). We then call context.write to write the output. Each line will be mapped to a pair structured as (event-id, 1), where 1 is of type IntWritable. Since 1 is a constant, we use a static variable to store it.
Hadoop internally performs a shuffling process to ensure that the output of the mapper with the same key (same event-id in our case) will go to the same reducer. A reducer thus receives a key and a collection of corresponding values (Java Iterable object). In our case the key-value pair is (event-id, [1,1,...,1]).
This can be illustrated with the following example.
Create a Java file FrequencyReducer.java. The FrequencyReducer class extends the predefined Reducer class and overwrites the reduce function.
The 4-tuple <Text, IntWritable, Text, IntWritable > specifies the types of the input and output key-value pair.
Note that the type of the input key-value pair (<Text, IntWritable>) is the same as the output key-value pair of the mapper.
Write a Java file Frequency.java that runs the MapReduce job.
Compile and Run
You will find all of the source code in the sample/hadoop folder. You will need to navigate to that folder first, compile, and create the jar before running it.
Compile the three java files with javac:
where hadoop classpath outputs the required class path to compile a Hadoop program. -d classes puts the generated classes into the classes directory. After running this command you will see three class files in the classes directory.
Let's create a jar named Frequency.jar using classes we just compiled.
In a real-world application development, you will not need to compile files manually one by one and then create the jar. Build tools like Maven, Gradle, SBT can be used to handle this process instead.
You can run the jar file just created with the command:
where Frequency.jar is the name of the jar file we just created and Frequency is the Java class to run. input and output are parameters to the Frequency class we implemented. Please note that input/case.csv and output are both paths in HDFS (and not the local file system). We specify the file input/case.csv as input, but we could only specify input as the input parameter if we want to process all files in the input folder in HDFS (in this case both input/case.csv and input/control.csv). Note that in that case you wouldn't need to create output folder yourself as the Hadoop framework would do that for you.
In the log output you may see messages like 'uber mode'. This means that the mappers and reducers will be forced to run under the same YARN container.
While the program is running, you will see a lot of messages. After the job finishes, you can check the results in the output directory (created by Hadoop) by using the commands:
You will get results like:
Please note that the output content order may be different from above.
If the output files are not too large, you can copy and merge all of them into a local file by using the command:
If you run the job again, you will see an error message saying the output directory already exists. This prevents a user from accidentally overwriting a file. You can remove the directory by using the command: