Often when running MapReduce jobs, people prefer setting configuration parameters from the command line. This helps avoid the need to hard code settings such as number of mappers, number of reducers, or max split size. Parsing options from the command line can be done easily by implementing Tool
and extending Configured
.
Below is a simple example. Note that there are a fair number of differences between the code below, and this other simple MapReduce Example even though the input and output are the same. Notice below that the job is executed by ToolRunner
‘s static run()
method.
package com.bigdatums.hadoop.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ToolMapReduceExample extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context) {
try {
String line = value.toString();
String[] fields = line.split("\t");
String firstName = fields[1];
word.set(firstName);
context.write(word, one);
}
catch(Exception e) {}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
try {
int sum = 0;
for(IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
catch(Exception e) {}
}
}
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
//input and output paths passed from cli
Path hdfsInputPath = new Path(args[0]);
Path hdfsOutputPath = new Path(args[1]);
//create job
Job job = new Job(conf, "Tools Job Example");
job.setJarByClass(ToolMapReduceExample.class);
//set mapper and reducer
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
//set output key and value classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//set input format and path
FileInputFormat.addInputPath(job, hdfsInputPath);
job.setInputFormatClass(TextInputFormat.class);
//set output format and path
FileOutputFormat.setOutputPath(job, hdfsOutputPath);
job.setOutputFormatClass(TextOutputFormat.class);
//run job return status
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ToolMapReduceExample(), args);
System.exit(res);
}
}
Here is an example of running the job above and setting the mapred.max.split.size
configuration parameter on the command line:
hadoop jar bigdatums-hadoop-1.0-SNAPSHOT.jar \
com.bigdatums.hadoop.mapreduce.ToolMapReduceExample \
-D mapred.max.split.size=12345 /inputDir /outputDir
Notice that the HDFS input and output directories are still passed as arguments. The list of supported command line options is shown below:
GENERIC_OPTION | Description |
---|---|
-conf <configuration file> | Specify an application configuration file. |
-D <property=value> | Use value for given property. |
-fs <local|namenode:port> | Specify a namenode. |
-jt <local|jobtracker:port> | Specify a job tracker. Applies only to job. |
-files <comma separated list of files> | Specify comma separated files to be copied to the map reduce cluster. Applies only to job. |
-libjars <comma seperated list of jars> | Specify comma separated jar files to include in the classpath. Applies only to job. |
-archives <comma separated list of archives> | Specify comma separated archives to be unarchived on the compute machines. Applies only to job. |