Hadoop API for .NET
=====================

Introduction
----------
Hadoop Streaming is a facility for writing map-reduce jobs in the language of you choice. Hadoop API for .NET is a wrapper to Streaming that provides a convenient experience for .NET developers. An understanding of the concepts and general functionality provided by Hadoop Streaming is necessary for successful use of this API: see http://hadoop.apache.org/common/docs/r0.20.0/streaming.html for this background information.

The main facilities provided by this API are:

1. Abstraction of job execution to avoid manual construction of streaming command-line.

2. Mapper, Reducer, Combiner base classes and runtime wrappers that provide helpful abstractions. For example, the ReducerCombinerBase class provides input through (string key, IEnumerable<string> value) groups.

3. Detection of .NET dependencies and automatic inclusion in streaming job.

4. Local unit-testing support for map/combine/reduce classes via StreamingUnit class

5. Support for JSON I/O and strongly typed mapper/combiner/reducer via Json* classes. The pattern used by the JSON classes can be used to create other serialization wrappers.

When jobs are submitted via the API a Hadoop Streaming command is generated and executed. The command is displayed on the console and can be used for direct invocation if required.

Input & Output formats
--------------------

The input/output format supported is line-oriented tab-separated records, staged in a Hadoop-supported file system such as HDFS or Azure Blob Storage. The input may comprise many files but each should have a consistent format: records delimited by \n\r, columns delimited by \t.

When a job comprises both a mapper and reducer, the key values emitted by the mapper must be plain text that can be sorted successfully with an ordinal-text-comparer such as provided by .NETs `StringComparison.Ordinal`.

In all other cases the record fields may comprise formatted text such as Json or other text representation of structured data. The API includes support for Json fields via the classes in the `Microsoft.Hadoop.MapReduce.Json` namespace.

If data is in a binary format or document-oriented format (such as a folder full of .docx files), the input to a map-reduce job will typically be files that list the path to each real file, one path per line. The mapper can then look up the files using whatever API is appropriate.


Example Map-Reduce program
------------------------

A .NET map-reduce 'program' comprises a number of parts:

1. A job definition. This declares the `MapperType`, `ReducerType`, `CombinerType` and configuration settings

2. Mapper, Reducer and Combiner classes

4. Input data. Typically staged to HDFS or Azure Storage prior to job execution. The most common approach is via the Hadoop file-system utility. For example,

> hadoop fs -copyFromLocal localFile input/folder/file

5. A job-executor. This can either be the `MRRunner.exe` that is part of the API distribution, or by creating a Main() function in your .NET application that invokes `HadoopJobExecutor`.

To create a mapper-only job:

1. Create a new C# project and reference `Microsoft.Hadoop.MapReduce.DLL`.

2. Create a class the implements `HadoopJob<FirstMapper>`.

3. Create a class called `FirstMapper` that implements `MapperBase`.

For example, the following is a complete map-reduce 'program' that consumes files containing integers and produces output that includes `sqrt(x)` for each input value.

    public class FirstJob : HadoopJob<SqrtMapper>
    {
        public override HadoopJobConfiguration Configure(ExecutorContext context)
        {
            HadoopJobConfiguration config = new HadoopJobConfiguration();
            config.InputPath = "input/SqrtJob";
            config.OutputFolder = "output/SqrtJob";
            return config;
        }
    }

    public class SqrtMapper : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            int inputValue = int.Parse(inputLine);

            // Perform the work.
            double sqrt = Math.Sqrt((double)inputValue);

            // Write output data.
            context.EmitKeyValue(inputValue.ToString(), sqrt.ToString());
        }
    }

To run this program, stage some data in HDFS:

1. create a text file called input.txt that has one integer per line.

2. import that text file to HDFS via

> hadoop fs -copyFromLocal input.txt input/SqrtJob/input.txt

3. compile your .NET code to a DLL called FirstJob.dll and run it via

> MRRunner -dll FirstJob.dll

When this runs, the console will display the complete Hadoop Streaming command issued and then the normal console output from the hadoop streaming command itself.

To see detailed information about the execution of current and past jobs, use the Hadoop streaming web front-end, typically accessible at http://localhost:50030.

To explore the HDFS filesystem, use the HDFS web front-end that is typically accessible at http://localhost:50080.

When the job completes, output will be available in HDFS at `/user/user/output/SqrtJob`.

HadoopJob class
-------------

The `Job<>` class defines the users components that are included in a map-reduce program. Generic parameters are used to declare the Mapper, Combiner and Reducer classes that should be used by the job. Configuration parameters are supplied by overriding the method

public abstract HadoopJobConfiguration Configure(ExecutorContext context);

To implement this method, instantiate a `HadoopJobConfiguration` object then set its members and return.


HadoopJobConfiguration class
--------------------------

The configuration for a job is a strongly typed bag of settings that are largely passed directly to the hadoop streaming command-line. Some settings result in non-trivial settings but most are straight forward. Only a subset of Hadoop Streaming settings are directly exposed through the Configuration object. All settings are usable through catch-all factilities: `config.AdditionalStreamingArguments` and `config.AdditionalGenericArguments`.

HadoopJobExectuor class
---------------------

HadoopJobExecutor handles the creation and execution of a complete Hadoop Streaming command-line. It can be called in various ways. The first is to use the MRRunner.exe utility which will invoked HadoopJobExecutor on your behalf

> MRRunner -dll MyMRProgram.dll {-class jobClass} {-- job-class options}

The second is to invoke the executor directly and request it execute a HadoopJob

HadoopJobExecutor.Execute<JobType>(arguments)

A third approach is to avoid the JobType and just invoke directly mentioning MapperType etc and a Configuration object

HadoopJobExecutor.Execute<TMapper,..>(configuration)

MRRunner
------
MRRunner is a command-line utility used to execute a map-reduce program written against the Hadoop for .NET API. To get started, you should have an assembly (a .net DLL or EXE) that defines at least one implementation of HadoopJob<>.

If MyDll contains only one implementation of Hadoop<>, you can run the job with

> MRRunner -dll MyDll

If MyDll contains multiple implementations of HadoopJob<>, indicate the one you wish to run

> MRRunner -dll MyDll -class MyClass

To supply options to your job, pass them as trailing arguments on the command-line, after a double-hyphen

> MRRunner -dll MyDll -class MyClass -- extraArg1 extraArg2

These additional arguments are provided your your job via a context object that is available to all methods on HadoopJob<>

MapperBase
--------

A MapperBase implementation describes how to perform the Map function. The input to Map will be a subset of the rows of the input. On each call to the Mapper.Map(string input, MapperContext context) method, a single line will be provided as input. The Map method can make use of the context object to lookup relevant setting, emit output lines and emit log messages and counter updates.

For example:
    public class MyMapper : MapperBase {
        public override void Map(string inputLine, MapperContext context)
        {
            context.Log("mapper called. input=" + inputLine);
            context.IncrementCounter("mapInputs");
            context.EmitKeyValue(key, value);
        }
    }
   

The MapperBase class also provides overridable methods to be run at the start/end of each batch. These methods can be used to perform set up and teardown such as initialising a component.


ReducerCombinerBase
-----------------

A ReducerCombinerBase implementation describes how to perform a reduce and/or combine operation. In each case the operation takes a group and emits key/value pairs that typically represent an aggregated representation of the group. For example, the input to a reducer may be: key = 'a', values = 1,2,3 and the output might be {'a',6}. To implement `ReducerCombinerBase`, override the `Reduce(key, values, context)` method and use the context object to emit key/value pairs as necessary.

A common requirement for a map-reduce program is to reuse one reduce function as both the reducer and the combiner. This is achieved by referencing the same reducer class when declaring a HadoopJob class.
    public class MyHadoopJob<MyMapper, MyReducer, MyReducer> {
        ...
    }


Json support
----------

The primary data format for Hadoop Streaming is line-oriented text and so the normal currency of Map and Reduce implementations is System.String. It can often be convenient to transform the strings to/from .NET objects and this requires a serialization mechanism. A set of classes that use Json.NET as the serialization engine are provided in the Microsoft.Hadoop.MapReduce.Json namespace. As an example of their use, consider input data that has JSON format values:
    {ID=2, Name="Alan"}
    {ID=3, Name="Bob"}

Further, assume that a class definition that can represent the values is
    public Employee {
        public int ID {get;set;}
        public string Name {get;set;}
    }
   

The Json Mapper classes can help perform the deserialization and transformation to Employee instances that is required for convenient processing. Let's assume the output of the Mapper will be simple strings; in this case the appropriate Mapper type to use is JsonInMapperBase<>. For example:
    public class MyMapper : JsonInMapperBase<Employee> {
        public override void Map(Employee value, MapperContext context){
        
        }
    }
   

JsonInMapperBase performs the deserialization of the input lines and the instantiation of Employee objects. The Map function that must be implemented can deal with Employee inputs rather than strings.

Other classes in `Microsoft.Hadoop.MapReduce.Json` support transferring object-representations between mapper and reducer and as the output of the reducer.

Last edited Oct 25, 2012 at 8:09 PM by mikelid, version 2

Comments

harvail Feb 25, 2013 at 9:08 AM 
Hi I am running the Sqrt Example with only Mapper and getting following error while running the Job
Error: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask:

Though output is correct.Is it expected in this case for the job to fail?

orleant Nov 13, 2012 at 6:47 PM 
I've resolved my issue via code, there is a FilesToInclude property in the Jobconfiguration object. I Just had to do something like this:

config.FilesToInclude.Add("StayWithMe.dll");

Thanks !

orleant Nov 13, 2012 at 4:50 PM 
Thanks for the great tuto !
I have some third party librairies dependencies with my project. Actually I can't run my mapreduce job because of the missing dependencies. Is there a way to include them with my job?
Regards.

RDas Oct 26, 2012 at 5:42 PM 
Superb! Can't wait further to try it out! installing HDInsight....