This project is read-only.

Example Map-Reduce program

A .NET map-reduce 'program' comprises a number of parts:

1. A job definition. This declares the `MapperType`, `ReducerType`, `CombinerType` and configuration settings

2. Mapper, Reducer and Combiner classes

3. Input data. Typically staged to HDFS or Azure Storage prior to job execution. The most common approach is via the Hadoop file-system utility. For example,

> hadoop fs -copyFromLocal localFile input/folder/file

4. A job-executor. This can either be the `MRRunner.exe` that is part of the API distribution, or by creating a Main() function in your .NET application that invokes `HadoopJobExecutor`.

To create a mapper-only job:

1. Create a new C# project and reference `Microsoft.Hadoop.MapReduce.DLL`.

2. Create a class the implements `HadoopJob<FirstMapper>`.

3. Create a class called `FirstMapper` that implements `MapperBase`.

For example, the following is a complete map-reduce 'program' that consumes files containing integers and produces output that includes `sqrt(x)` for each input value.

    public class FirstJob : HadoopJob<SqrtMapper>
    {
        public override HadoopJobConfiguration Configure(ExecutorContext context)
        {
            HadoopJobConfiguration config = new HadoopJobConfiguration();
            config.InputPath = "input/SqrtJob";
            config.OutputFolder = "output/SqrtJob";
            return config;
        }
    }

    public class SqrtMapper : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            int inputValue = int.Parse(inputLine);

            // Perform the work.
            double sqrt = Math.Sqrt((double)inputValue);

            // Write output data.
            context.EmitKeyValue(inputValue.ToString(), sqrt.ToString());
        }
    }

To run this program, stage some data in HDFS:

1. create a text file called input.txt that has one integer per line.

2. import that text file to HDFS via

> hadoop fs -copyFromLocal input.txt input/SqrtJob/input.txt
3. compile your .NET code to a DLL called FirstJob.dll and run it via

> MRRunner -dll FirstJob.dll

When this runs, the console will display the complete Hadoop Streaming command issued and then the normal console output from the hadoop streaming command itself.

To see detailed information about the execution of current and past jobs, use the Hadoop streaming web front-end, typically accessible at http://localhost:50030.

To explore the HDFS filesystem, use the HDFS web front-end that is typically accessible at http://localhost:50080.

When the job completes, output will be available in HDFS at `/user/[user]/output/SqrtJob`.

HadoopJob class

The `Job<>` class defines the users components that are included in a map-reduce program. Generic parameters are used to declare the Mapper, Combiner and Reducer classes that should be used by the job. Configuration parameters are supplied by overriding the method

public abstract HadoopJobConfiguration Configure(ExecutorContext context);

To implement this method, instantiate a `HadoopJobConfiguration` object then set its members and return.

HadoopJobConfiguration class

The configuration for a job is a strongly typed bag of settings that are largely passed directly to the hadoop streaming command-line. Some settings result in non-trivial settings but most are straight forward. Only a subset of Hadoop Streaming settings are directly exposed through the Configuration object. All settings are usable through catch-all facilities: `config.AdditionalStreamingArguments` and `config.AdditionalGenericArguments`.

MapReduceJob.ExecuteJob method

MapReduceJob is a property on Hadoop connection object. It provides access to implementation of IStreamingJobExecutor interface which handles the creation and execution of Hadoop Streaming jobs. Depending on how the connection object was created it can execute jobs locally using Hadoop command-line or against remote cluster using WebHCat client library. Job executor can be called in various ways. The first is to use the MRRunner.exe utility which will invoke LocalStreamingJobExecutor implementation on your behalf

> MRRunner -dll MyMRProgram.dll {-class jobClass} {-- job-class options}

The second is to invoke the executor directly and request it execute a Hadoop Job.

      var hadoop = Hadoop.Connect();
      hadoop.MapReduceJob.ExecuteJob<JobType>(arguments);


A third approach is to avoid the JobType and just invoke directly mentioning MapperType etc and a Configuration object

      var hadoop = Hadoop.Connect();
      hadoop.MapReduceJob.Execute< TMapper,..>(configuration);

MRRunner

MRRunner is a command-line utility used to execute a map-reduce program written against the Hadoop for .NET API. To get started, you should have an assembly (a .net DLL or EXE) that defines at least one implementation of HadoopJob<>.

If MyDll contains only one implementation of Hadoop<>, you can run the job with

> MRRunner -dll MyDll

If MyDll contains multiple implementations of HadoopJob<>, indicate the one you wish to run

> MRRunner -dll MyDll -class MyClass

To supply options to your job, pass them as trailing arguments on the command-line, after a double-hyphen

> MRRunner -dll MyDll -class MyClass -- extraArg1 extraArg2

These additional arguments are provided to your job via a context object that is available to all methods on HadoopJob<>

MapperBase

A MapperBase implementation describes how to perform the Map function. The input to Map will be a subset of the rows of the input. On each call to the Mapper.Map(string input, MapperContext context) method, a single line will be provided as input. The Map method can make use of the context object to lookup relevant setting, emit output lines and emit log messages and counter updates.

For example:
    public class MyMapper : MapperBase {
        public override void Map(string inputLine, MapperContext context)
        {
            context.Log("mapper called. input=" + inputLine);
            context.IncrementCounter("mapInputs");
            context.EmitKeyValue(key, value);
        }
    }
   

The MapperBase class also provides overridable methods to be run at the start/end of each batch. These methods can be used to perform set up and teardown such as initialising a component.

ReducerCombinerBase

A ReducerCombinerBase implementation describes how to perform a reduce and/or combine operation. In each case the operation takes a group and emits key/value pairs that typically represent an aggregated representation of the group. For example, the input to a reducer may be: key = 'a', values = [1,2,3] and the output might be {'a',6}. To implement `ReducerCombinerBase`, override the `Reduce(key, values, context)` method and use the context object to emit key/value pairs as necessary.

A common requirement for a map-reduce program is to reuse one reduce function as both the reducer and the combiner. This is achieved by referencing the same reducer class when declaring a HadoopJob class.
    public class MyHadoopJob<MyMapper, MyReducer, MyReducer> {
        ...
    }


Next: Complex type serialization

Last edited Mar 25, 2013 at 4:59 AM by maxluk, version 2

Comments

billramo Oct 28, 2013 at 10:17 PM 
Any plans to update this example to show how to upload the input file and run the streaming map-reduce job using the new cmdlets with PowerShell?
Thanks,,
Bill

tomcatwi Apr 4, 2013 at 9:35 PM 
I'm trying a simple example and I get an exception saying "Could not load file or assembly Microsoft.Hadoop.MapReduce, Version=0.5.4821.31844,...", it Auto Detects the assembly when it starts up...why can't it load it?

djcarter Mar 24, 2013 at 12:10 PM 
Hello, with the code above I get the following when I try and use MRRunner.exe:
ERROR: No classes in DLL derive from MapReduceJob.

I tried to fix by add this '"public class SqrtReducer : ReducerCombinerBase" and updating FirstJob to "HadoopJob<SqrtMapper,SqrtReducer>" but it's not helping.

Any thoughts?

Thanks,
DJ