Extension methods that accept .NET UDFs as parameters

  • Map
  • MapMany
  • Reduce
  • ReduceMany
  • ClusterBy
This functions can be integrated into Linq to Hive query and will be translated into Hive query streaming commands. For example, you can implement Word Count in Linq to Hive using following code:
  var counts = db.Wonderland
                     .MapMany(item => item.Paragraph.Split(_punctuationChars).Select(w => new { Word = w }))
                     .GroupBy(item => item.Word)
                     .Select(g => new { Word = g.Key, Count = g.Count() });

  • Wonderland is table that represents text file context. It has single column Paragraph.
  • MapMany is custom mapping operation that executes .NET code on the cluster as part of the query pipeline. The code splits text into multiple words (hence Many in the MapMany function name) using .NET framework calls. The lambda function provided can contain any .NET framework code (closures to the outside environment are not supported).
  • Rest of the sample is regular Hive aggregation query operators to count words.
When executed this query is translated into corresponding Hive query:
ADD FILE ${jobFolder}/HiveDriver.exe;
ADD FILE ${jobFolder}/LinqToHiveMapReduceSample.exe;
SELECT t0.Word, COUNT(*) AS Count
  MAP t1.Paragraph
  USING './HiveDriver.exe MAPMANY LinqToHiveMapReduceSample.exe LinqToHiveMapReduceSample.LinqToHiveWordCount PFJ1blNhbXBsZT5iX18w'
  AS Word
  FROM Wonderland t1
  ) t0
GROUP BY t0.Word

Before this query is submitted to the cluster Linq To Hive uploads assemblies that implement custom functions to the Storage Account, Hive then makes them available on each data node (through ADD FILE directives) so they can be called in from Hive streaming commands (MAP command in the example above).
During execution of the query, at the appropriate phase of the query pipeline, Hive launches HiveDriver.exe and streams data into it using standard input/output. HiveDriver materializes .NET objects expected by lambda function in MapMany on input; calls the function and serializes output objects into string format expected by streaming interface.

Complete sample is available in source code of the SDK here.

Web Site Log Processing Sample

This sample illustrates custom aggregation function. The sample implements typical scenario of multi-step processing of log files.
  var hitsByIP = db.WebSiteLog
                       .Where(item => item.RequestMethod != "s-ip") // filter out header
                       .GroupBy(item => item.ClientIP)
                       .Select(g => new { ClientIP = g.Key, Count = g.Count() });

  var hitsByLocation = hitsByIP.Map(item =>
                           var location = LookupLocation(item.ClientIP);
                           var areaLocation = LookupArea(location);
                           return new { item.ClientIP, location.Longtitude, location.Latitude, AreaLongtitude = areaLocation.Longtitude, AreaLatitude = areaLocation.Latitude, item.Count };

  var hitsByArea = hitsByLocation
                       .ClusterBy(item => new { item.AreaLongtitude, item.AreaLatitude })
                       .Reduce(g =>
                           var items = g.ToArray();
                           var center = CalculateWeightedCenter(items.Select(item => new HitsByLocation
                               Location = new Location { Longtitude = item.Longtitude, Latitude = item.Latitude },
                               Count = item.Count
                           return new { 
                               AreaLongtitude = center.Longtitude,
                               AreaLatitude =   center.Latitude,
                               Count = items.Sum(item => item.Count)

  var results = hitsByArea.ToList();

In this example you can see Reduce function that processes groups of rows grouped by AreaLongtitude and AreaLatitude key as specified by ClusterBy operation. The rows are run through .NET calculation algorithm and single aggregated row is output as a result.

Complete sample is available in source code of the SDK here.

Last edited Jul 12, 2013 at 11:11 PM by maxluk, version 4


jddunlap Jun 11, 2014 at 12:29 AM 
Is there a way to use this to stream data from the reducers directly to another Hive table? I.e. auto-generate an "INSERT OVERWRITE dest_table SELECT ... FROM (MAP ...)" statement?