Often when running MapReduce jobs, people prefer setting configuration parameters from the command line. This helps avoid the need to hard code settings such as number of mappers, number of reducers, or max split size. Parsing options from the command line can be done easily by implementing Tool and extending Configured.

Below is a simple example. Note that there are a fair number of differences between the code below, and this other simple MapReduce Example even though the input and output are the same. Notice below that the job is executed by ToolRunner‘s static run() method.

package com.bigdatums.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class ToolMapReduceExample extends Configured implements Tool {

    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(LongWritable key, Text value, Context context) {
            try {
                String line = value.toString();
                String[] fields = line.split("\t");
                String firstName = fields[1];
                word.set(firstName);
                context.write(word, one);
            }
             catch(Exception e) {}
        }
    }

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context) {
            try {
                int sum = 0;
                for(IntWritable val : values) {
                    sum += val.get();
                }
                context.write(key, new IntWritable(sum));
            }
            catch(Exception e) {}
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        //input and output paths passed from cli
        Path hdfsInputPath = new Path(args[0]);
        Path hdfsOutputPath = new Path(args[1]);

        //create job
        Job job = new Job(conf, "Tools Job Example");
        job.setJarByClass(ToolMapReduceExample.class);

        //set mapper and reducer
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        //set output key and value classes
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //set input format and path
        FileInputFormat.addInputPath(job, hdfsInputPath);
        job.setInputFormatClass(TextInputFormat.class);

        //set output format and path
        FileOutputFormat.setOutputPath(job, hdfsOutputPath);
        job.setOutputFormatClass(TextOutputFormat.class);

        //run job return status
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ToolMapReduceExample(), args);
        System.exit(res);
    }
}

Here is an example of running the job above and setting the mapred.max.split.size configuration parameter on the command line:

hadoop jar bigdatums-hadoop-1.0-SNAPSHOT.jar \
  com.bigdatums.hadoop.mapreduce.ToolMapReduceExample \
  -D mapred.max.split.size=12345 /inputDir /outputDir

Notice that the HDFS input and output directories are still passed as arguments. The list of supported command line options is shown below:

GENERIC_OPTIONDescription
-conf <configuration file>Specify an application configuration file.
-D <property=value>Use value for given property.
-fs <local|namenode:port>Specify a namenode.
-jt <local|jobtracker:port>Specify a job tracker. Applies only to job.
-files <comma separated list of files>Specify comma separated files to be copied to the map reduce cluster. Applies only to job.
-libjars <comma seperated list of jars>Specify comma separated jar files to include in the classpath. Applies only to job.
-archives <comma separated list of archives>Specify comma separated archives to be unarchived on the compute machines. Applies only to job.

Leave a Reply

Hadoop – Setting Configuration Parameters on Command Line