CSE 124: Networked Services

Writing a MapReduce

Introduction

MapReduce comes from two standard features in many function programming languages.

The Map function takes a {key, value}. performs some computation on them, then outputs a {key, value}.

Map: (key, value) -> (key, value)

The Reduce function takes a {key, value[]}, performs some computation on them, then outputs a {key, value}[].

Reduce: (key, value[]) -> (key, value)[]

output.collect(key, value); is how Map and Reduce functions emit their {key, value} pairs.

Writing a MapReduce is as simple as writing these two functions (or using already-provided functions), then telling a job object which functions you want to use.

Trivial Identity Example

Trivial.java

This may seem like a lot of code to setup, but it's pretty simple and straightforward. Save this code to Trivial.java in a directory named example.

Compiling your Map Reduce Program

In this example, we will assume that the above program resides in a directory called example in your home directory. Copy this Makefile and put it into the example directory. Edit the makefile so JAVA_HOME and HADOOP_PATH contain the correct paths. Now do:

    $ cd example
    $ make
    

At this point you should have a file called build.jar under the examples directory. You should now be able to launch your first Hadoop job!

If you are running this example on your local machine, then you have to configure your Hadoop installation. First, edit conf/hadoop-env.sh and define the JAVA_HOME variable. Now, save this file as conf/hadoop-site.xml to configure a single node "cluster":

You should now be ready to setup the Hadoop distributed file system:
/path/to/hadoop/bin/hadoop namenode -format

Now start all the Hadoop processes:
/path/to/hadoop/bin/start-all.sh

Copy a file (any text file) into the file system:
/path/to/hadoop/bin/hadoop dfs -put /path/to/localfile input-filename

Now you can actually run your job:

    $ cd ~
    $ /path/to/hadoop/bin/hadoop jar example/build.jar Trivial input-file output-dir
    

Test this with a small input file first. Hadoop should report the progress of your job on standard output. For more detailed status report, you can check the status pages as described here. Once the job finishes, the output will be available under output-dir inside your DFS. So you will have to do a dfs -get to retrieve the output to your local file system if you want to directly view/analyze it.

JobConf

There are many options for JobConf objects. For complete details see JobConf on the Hadoop API page. I shall cover what I have found to be the most useful options here.

  • JobConf(Class main_class);

    This initializes the configuration. Use the class for your main file to assist with logging.

    For those of you not familiar with reflection, you can use the ClassName.class to specify a class statically (which is mostly what you will be doing).

  • conf.setCombinerClass(Class combiner);

    combiner implements Reducer

    The Combiner class is like a Reducer, in fact, it is often the same class as the Reducer. It operates after all the Maps have been done on a single node, but before gathering up the keys from the other nodes. This is an optimization to make use of all the data already in memory on one machine. It is most useful in examples like WordCount, in which the Map simply outputs (key,1) pairs. The Combiner can easily reduce the amount of data sent to the Reduce phase by pre-summing the pairs.

  • conf.setOutputKeyClass(Class obj);

    obj implements WritableComparable

    This is the type of the keys output by the Reducer function. Unless setMapOutputKey is used, it is also the type of the keys emitted by Map.

  • conf.setOutputValueClass(Class obj);

    obj implements WritableComparable

    This is the type of the values output by the Reduce function. Unless setMapOutputKey is used, it is also the type of the keys emitted by Map.

  • conf.setInputFormat(Class format);

    format extends InputFormatBase

    This class provides methods for reading in custom input formats. This is done using a (usually inlined) subclass of RecordReader. (See InputFormatBase)

    You may optionally specify how files are to be split if they are too big. As well as which files within a directory are going to be used as input, instead of the all of them.

  • conf.setNumReduceTasks(1);

    This controls the number of reduce tasks, and consequently the number of output files.

    You usually want to specify this in the conf/hadoop-site.xml config file, but sometimes it only makes sense to have a single output file, as when sorting. In other cases setting this should be avoided, so as to make best use of cluster resources.

  • conf.set(String property_name, String value);
    conf.get(String property_name, String default_value);

    In theory, value can be an Object, but when it is, it is just .toString'd.

    Use these methods to pass other parameters to your Map and Reduce classes.

Mapper & Reducer

While map is the only required function to overload in a Mapper class, and reduce is the only required function to overload in a Reducer class, there are a few other useful ones which are inherited from MapReduceBase. For complete details see MapReduceBase on the Hadoop API page. I shall cover what I have found to be the most useful options here.

  • configure(JobConf job)

    Use this method to initialize your Mapper or Reducer object. This is where you have access to the JobConf object for the current job. You must extract any parameters here. It is called before map and reduce.

  • close()

    This method is called after map and reduce. Use it to perform any cleanup you initialized in configure().

<- Hadoop Intro.