Being able to sort by all keys in a data set is a common need in the world of big data. Those familiar with Hive or relational databases know that this easily be done with with a simple SQL statement. For example, sorting an entire data set by “first_name” would look something like this:

SELECT * FROM my_table ORDER BY first_name

Achieving this same result in MapReduce is unfortunately a little more complicated. But the basic idea is that we can use the TotalOrderPartitioner to partition the data in such a way that each reducer will only receive data containing a specific range of keys. For example, if we are trying to sort an entire data set that contains keys with the values between 1 and 10 (distributed evenly), using two reducers we would expect that the first reducer would be sent data with keys containing 1-5 and the second reducer would be sent records containing keys 6-10.

In order to determine the minimum and maximum values sent to each reducer, input data is sampled to obtain a general idea of the cardinality and frequency of the keys in the data set. Once the data is sampled, a “paritioner” file is created which indicates the range of keys sent to each reducer. The keys sent to each reducer are sorted during the “shuffle and sort” phase of map reduce, and reducer data is output in the same order. Reading reduce files in order will result in a data set globally sorted by key.

Below is an example of doing Total Order Sorting in Hadoop MapReduce:

package com.bigdatums.hadoop.mapreduce;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;


public class TotalOrderSortingExample extends Configured implements Tool {

    public static class ExtractSortedValueMap extends Mapper<Object, Text, Text, NullWritable> {
        Text word = new Text();
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split("\t");
            word.set(fields[1]);
            context.write(word, NullWritable.get());
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = this.getConf();

        //input and output paths passed from cli
        Path inputPath = new Path(args[0]);
        Path stageOutputPath = new Path(args[1]);
        Path partitionerFile = new Path(args[2]);
        Path sortedOutputPath = new Path(args[3]);

        //create job to extract values to be sorted
        Job samplingJob = new Job(conf, "Total Order Sorting Example - Extract Job");
        samplingJob.setJarByClass(TotalOrderSortingExample.class);

        //set mapper to extract value
        //first job has no reducer
        samplingJob.setMapperClass(ExtractSortedValueMap.class);
        samplingJob.setNumReduceTasks(0);

        //set key and value classes
        samplingJob.setOutputKeyClass(Text.class);
        samplingJob.setOutputValueClass(NullWritable.class);

        //set input format and path
        FileInputFormat.addInputPath(samplingJob, inputPath);
        samplingJob.setInputFormatClass(TextInputFormat.class);

        //set output format and path
        FileOutputFormat.setOutputPath(samplingJob, stageOutputPath);
        samplingJob.setOutputFormatClass(SequenceFileOutputFormat.class);

        //run job return status
        int code = samplingJob.waitForCompletion(true) ? 0 : 1;

        //run total ordering job if first job successful
        if(code == 0) {
            //create total ordering job
            Job sortingJob = new Job(conf, "Total Order Sorting Example - Partition and Sort Job");
            sortingJob.setJarByClass(TotalOrderSortingExample.class);

            //input and output formats
            sortingJob.setInputFormatClass(SequenceFileInputFormat.class);
            SequenceFileInputFormat.setInputPaths(sortingJob, stageOutputPath);
            TextOutputFormat.setOutputPath(sortingJob, sortedOutputPath);

            //mapper settings
            sortingJob.setMapperClass(Mapper.class);
            sortingJob.setMapOutputKeyClass(Text.class);
            sortingJob.setMapOutputValueClass(NullWritable.class);

            //reducer settings
            sortingJob.setNumReduceTasks(5);
            sortingJob.setReducerClass(Reducer.class);
            sortingJob.setOutputFormatClass(TextOutputFormat.class);

            //create total order partitioner file
            TotalOrderPartitioner.setPartitionFile(sortingJob.getConfiguration(), partitionerFile);
            InputSampler.Sampler<Text, NullWritable> inputSampler = new InputSampler.RandomSampler<Text, NullWritable>(.01, 1000, 100);
            InputSampler.writePartitionFile(sortingJob, inputSampler);
            sortingJob.setPartitionerClass(TotalOrderPartitioner.class);

            //run total ordering job
            code = sortingJob.waitForCompletion(true) ? 0 : 2;
        }

        //deleting first mapper output and partitioner file
        FileSystem.get(new Configuration()).delete(partitionerFile, false);
        FileSystem.get(new Configuration()).delete(stageOutputPath, true);

        return code;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new TotalOrderSortingExample(), args);
        System.exit(res);
    }
}

A quick explanation of the globally sorting MapReduce example above:

Job 1:
The first job is used simply to prep the data needing to be sorted. This job does not necessarily need to be run. Output from previously run MapReduce jobs may be used. However, the input data for Job #2 must be in the SequenceFileInputFormat.

Job 2:
This job contains most of the logic for total order sorting. First we set up our Mapper and Reducer classes. In the example above, there are no changes made to the input data, so an identity mapper and identity reducer is used.

The number of reducers must be set before creating the partitioner file. This is because for n reducers, there will be n-1 boundaries in the partitioner file.

Next, we sample the input data for this job to determine the boundaries for the partitioner file. There are a variety of methods for sampling the input data. In this example RandomSampler is used.

Next we set the partition file in TotalOrderPartitioner, and set TotalOrderPartitioner as the partitioner of this job.

The job runs, and there will be 5 output files because we specified 5 reducers. Reading these output files in order will result in a data set globally sorted by key.

Leave a Reply

How to do Total Order Sorting in Hadoop MapReduce