CSE 223B Labs

Lab 1 Lab 2 Lab 3
Lab 1: Running MapReduce on Hadoop
Due: 11am, Thursday, April 11th, 2013

Hadoop 101

Hadoop Software

Hadoop is an open source implementation of Google's MapReduce that we covered in the class lecture. In order to run a MapReduce application using the Hadoop framework we'll have to implement a Mapper class and a Reducer class and then run it on the Hadoop cluster.

The Mapper class contains a map function, which is called once for each input and outputs any number of intermediate (key, value) pairs. What code you put in the map function depends on the problem you are trying to solve. After the Map phase produces the intermediate (key, value) pairs they are efficiently and automatically grouped by key in preparation for the Reduce phase (this grouping is known as the Shuffle phase of a MapReduce).

The Reducer class contains a reduce function, which is then called once for each key. Each reduce instance looks at all the values for that key and outputs a "summary" value for that key in the final output. How the keys are summarized again depends on the problem we're trying to solve.

Hadoop Services

The Hadoop infrastructure allows the map and reduce tasks to run in parallel on a large number of machines, abstracting the details for replication and fault-tolerance from the user. The user only needs to implement the Mapper and Reducer classes and Hadoop takes care of communication between the different machines in the cluster, moving data across them and combining intermediate results to generate the final response.

At a very high level, this is how Hadoop runs manages a MapReduce job:
  1. Tasks are submitted to a JobTracker daemon which runs once per cluster. The JobTracker does some preprocessing then divides up the job and passes on smaller tasks to a number of TaskTracker daemons on other (or the same) machines in the cluster. The TaskTrackers continually update the JobTracker as to their progress and when they're done, their results.
  2. There is a distributed filesystem which is available to all the Hadoop nodes. This filesystem is run by a NameNode (which is another daemon, usually run on the same machine as the JobTracker). Files are broken up into Blocks which are replicated spread out among many DataNodes. The NameNode handles requests for files and locates which DataNodes hold the appropriate blocks for each request. It then directs the appropriate DataNodes to provide the requested Blocks.

Hadoop Resources

Hadoop's a very popular platform for running MapReduce applications and should be able to find plenty of resources on the Web. Here's a small list for resources with Hadoop documentation.


Getting Started

While we have the Hadoop cluster running for you, we highly recommend that you run a single node cluster on your own personal machine to locally test your MapReduce implementation on a smaller test input. While running your own Hadoop cluster is optional for this lab, doing that will give you a much better insight on how the Hadoop services work. It will also be useful since there are other students in this class who are sharing this cluster with you. Sharing the cluster may significantly slow down the processing of your individual hadoop job depending on the workload on the cluster. With your own cluster you will not have to worry about sharing conflicts and will help your MapReduce applications in a simpler setup. There's a tutorial available on the Hadoop Web page that describes how to your own cluster on a single machine. You can download the stable hadoop-1.0.4.tar.gz tarball from any of the mirror sites listed on the Apache website.

Since Hadoop has been natively implemented in Java, it's easiest to run applications written in Java on Hadoop and we recommend that you write your applications in Java as well. However, if you're a seasoned Hadoop user and can run your C++/Python code on Hadoop, feel free to use those for the assignment.

Running the Word Count Example:

For starters, we will run the famous word count example on our Hadoop cluster. Since the Hadoop services are already running, all we need to do populate our data set in the HDFS and then run the jar file that has the Mapper and Reducer methods implemented.

SSH into any node in the cluster and try the following commands and add the following lines to your ~/.bashrc file.
export HADOOP_PATH=/classes/cse223b/sp13/labs/lab1/hadoop
export INPUT_PATH=/classes/cse223b/sp13/labs/lab1/input
export USER=`whoami`
unalias fs &> /dev/null
alias fs="hadoop fs"
To make sure the paths are set correctly before you run the following steps, run source ~/.bashrc and echo the value of HADOOP_PATH, TEST_PATH and USER environment variables.

STEP 1: Populate data in HDFS:

Run the following command to populate the data onto Hadoop's File System.

$HADOOP_PATH/bin/hadoop dfs -copyFromLocal $INPUT_PATH/wc_input /user/$USER/wc_input
$HADOOP_PATH/bin/hadoop dfs -lsr /user/$USER/wc_input
This command will populate the data in Hadoop's distributed file system in a format so that Hadoop can efficiently move around different nodes in the cluster. If the two commands ran successfully, you should be able to some .txt files listed under the /user/$USER/wc_input folder.

STEP 2: Run the MapReduce Job:

To launch the MapReduce job, run the following command. The following command with spawn the map and reduce tasks for counting the words in the documents listed under wc_input folder and save the results to the wc_output folder.

$HADOOP_PATH/bin/hadoop jar $HADOOP_PATH/hadoop*examples*jar wordcount /user/$USER/wc_input /user/$USER/wc_output
If the command ran successfully, your output should look something like this:
Warning: $HADOOP_HOME is deprecated.

13/04/04 00:35:27 INFO input.FileInputFormat: Total input paths to process : 11
13/04/04 00:35:27 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/04/04 00:35:27 WARN snappy.LoadSnappy: Snappy native library not loaded
13/04/04 00:35:28 INFO mapred.JobClient: Running job: job_201304032320_0011
13/04/04 00:35:29 INFO mapred.JobClient:  map 0% reduce 0%
13/04/04 00:36:01 INFO mapred.JobClient:  map 36% reduce 0%
13/04/04 00:36:04 INFO mapred.JobClient:  map 54% reduce 0%
13/04/04 00:36:10 INFO mapred.JobClient:  map 63% reduce 0%
13/04/04 00:36:13 INFO mapred.JobClient:  map 100% reduce 0%
13/04/04 00:36:19 INFO mapred.JobClient:  map 100% reduce 21%
13/04/04 00:36:28 INFO mapred.JobClient:  map 100% reduce 100%
13/04/04 00:36:33 INFO mapred.JobClient: Job complete: job_201304032320_0011
13/04/04 00:36:33 INFO mapred.JobClient: Counters: 30
13/04/04 00:36:33 INFO mapred.JobClient:   Job Counters
13/04/04 00:36:33 INFO mapred.JobClient:     Launched reduce tasks=1
...snip..

RESULTS: If your job completed correctly, then you can see the contents of the output folder:

$HADOOP_PATH/bin/hadoop dfs -lsr /user/$USER/wc_output
Right now the results for your job are in the DFS. You can copy them to local storage with the following commands.
$HADOOP_PATH/bin/hadoop dfs -getmerge /user/$USER/wc_output ~/wc_output

NOTE: Please email the TA ASAP if you cannot SSH into the machines or if any of the steps fail for you. You will not be able to run you own jobs, if the WordCount example is failing.

When running your Hadoop jobs, always specify the output directory to be under the /user/$USER/ root directory. You have write access to this directory of the DFS and if you specify another path, your job will fail due to insufficient permissions.


Writing your own MapReduce Application:

Writing a MapReduce application is as simple as writing the Map and Reduce functions (or using already-provided functions), and then telling a job object which functions you want to use. To run the MapReduce application, all you need to do is to repeat Step 1 and Step 2 for the WordCountExample but replacing the Hadoop*examples*jar file with the jar file of your own MapReduce implementation along with the right command line arguments.

We are providing the skeleton code with the simplest Map Reduce functions: Trivial.java.

Save this code as Trivial.java in a directory named example under your home directory. Copy this Makefile to the example directory. In our Trivial.java example, in addition to the Mapper and Reducer classes, we've also implemented the driver program (with the main method) that starts the job on the cluster by invoking the runJob method on the JobConf object (conf). You can read more about the JobConf object on the Hadoop API page.

Make sure that JAVA_HOME and HADOOP_PATH are set correctly and run the following commands.

cd ~/example
make
At this point you should have a file called build.jar under the examples directory. You should now be able to launch your own Hadoop job! To launch your own MapReduce job, run the following commands.
cd ~/example
$HADOOP_PATH/bin/hadoop dfs -put $INPUT_PATH/trivial_test.txt /user/$USER/trivial_test.txt
$HADOOP_PATH/bin/hadoop jar ~/example/build.jar Trivial /user/$USER/trivial_test.txt /user/$USER/trivial_output
$HADOOP_PATH/bin/hadoop dfs -getmerge /user/$USER/trivial_output ~/trivial_output
Once the job finishes, the output will be available under output-dir inside your DFS. So you will have to do a dfs -get to retrieve the output to your local file system if you want to directly view/analyze it.

Generating an Inverted Index:

For this lab, you will build an inverted index of words to the documents which contain them. Your end result will be something of the form: (word, docid[]).

We have more test data in the $INPUT_PATH/classics/ folder. You can copy this data to your DFS folder (/user/$USER/) and run your MapReduce job on the input data.

One challenge here is that the default file format doesn't provide you with the name of the file in your map function. You will have to figure some way around this (such as putting the name of the file on each line in the file, or much better, writing a new InputFormat class.) This will get you a little more intimate with the workings of Hadoop.

You'll also have to do a little more than simply tokenize the texts on white space. Make sure that punctuation and case also get stripped. But ensure that contractions don't change meaning, (like "it's" becoming possessive "its").

Update: Your .jar file should take two command line arguments. One for specifying the input directory and the other for output directory (similar to the word count example).


Deliverables

Turnin: tarball of:

Last updated: Tue Apr 30 13:22:18 -0700 2013 [validate xhtml]