Search This Blog

Friday, February 25, 2011

Hadoop Pig and Java Map Reduce - A maven example

Recently I have been involved with the Hadoop family and as always would like to share :-) I am hoping to provide an individual interested in evaluating Map-Reduce and Apache Pig a starter project for the same.

At the core of Hadoop lies HDFS and the ability to perform Map-Reduce operations on data. Leaning on my previous example of Cassandra Map Reduce, this BLOG will help demonstrate how Map-Reduce using Hadoop can be achieved using simple plain ole Java or its exotic cousin Apache Pig.

Before getting started with the example, you will need to get Hadoop running in a pseudo distributed mode at the very least.  As a user of Ubuntu, I found the BLOG by Rahul Patodi to be a great start to installing Cloudera's Hadoop version. Alternatively, you can do the same by following the instructions on the Cloudera WIKI.

The example used in this BLOG uses a file that contains "comments" that are written to a file in HDFS in JSON format and subsequently demonstrates how Map Reduce jobs can be executed either in Java or in Pig. The jobs themselves check for certain "Key words" of interest within the comments posted, think Web Bot here :-). An example comments file could look like:
{"commenterId":"donaldduck","comment":"The world is a cave. James bond lives in a Cave.","country":"TANZANIA"}
{"commenterId":"factorypilot","comment":"Only a cave man could do this","country":"JAPAN"}
{"commenterId":"nemesis","comment":"Felix Lighter and James Bond work well together as they are cave men","country":"BRAZIL"}
{"commenterId":"jamesbond","comment":"James Bond would be dead without Q to help him.","country":"GERMANY"}

Java Map-Reduce
For the Java version, one would write a Mapper that would extract the comment key, check the same for occurences of the word of interest and increment the same while a Reducer in turn totals the results as show below:

public class CommentWordMapReduce {
  /**
   * Mapper
   */
  public static class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable ONE = new IntWritable(1);

    private Text word = new Text();

    private static final String COMMENT_KEY = "comment";
    private static final Set<<String> WORDS_TO_GET_COUNT_OF = new HashSet<String>(Arrays
        .asList(new String[] { "james", "2012", "cave", "walther", "bond" }));

    private final JSONParser parser = new JSONParser();
    
    
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException,
      InterruptedException {
      JSONObject jsonObj;
      try {
        jsonObj = (JSONObject) parser.parse(value.toString());
      }
      catch (ParseException e) {
        // Hmm unable to Parse the JSON, off to next record, better log though :-)
        e.printStackTrace();
        return;
      }
      String comment = jsonObj.get(COMMENT_KEY).toString();
      StringTokenizer tokenizer = new StringTokenizer(comment);

      while (tokenizer.hasMoreTokens()) {
        String token = tokenizer.nextToken().toLowerCase();
        if (WORDS_TO_GET_COUNT_OF.contains(token)) {
          word.set(token);
          context.write(word, ONE);
        }
      }
    }
  }

  /**
   * Reducer
   */
  public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
      InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      context.write(key, new IntWritable(sum));
    }
  }

  /**
   * @param inputPath The input file location from HDFS
   * @param outputPath Where to store results of the Map-Reduce
   */
  public boolean mapred(String inputPath, String outputPath) throws IOException,
    InterruptedException,
    ClassNotFoundException {
    Configuration conf = new Configuration();

    Job job = new Job(conf, "process word count");
    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setMapperClass(WordMap.class);
    job.setReducerClass(WordReducer.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    
    job.setNumReduceTasks(1);
    
    FileInputFormat.setInputPaths(job, new Path(inputPath));
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    
    job.waitForCompletion(true);

    return job.isSuccessful();
  }
}
A unit test provided will simply ensure that the expected count for each interested word infact match up with the actual as shown below:
public void javaMapReduce() throws Exception {
   assertTrue(new CommentWordMapReduce().mapred(INPUT_HDFS_FILE, OUTPUT_HDFS_DIR));
   validateResult();
}
If you wish to see the contents of the map-reduce job then execute:
>hadoop fs -cat /users/junittest/wcresults/part-r-00000
2012 1828
bond 4490
cave 2769
james 3631
walther 921
Pig Map-Reduce:

For the Pig Map Reduce equivalent of the example, there are two helper classes that I define, a custom LoadFunc (Loader Function) for the JSON file and a FilterFunc (filter) to only include the words of interest. The custom JSON loader is the courtesy of Kim Vogt from Git Hub and the Like Filter is a non-comprehensive version that I defined as follows:

public class LikeFilter extends FilterFunc {

  public Boolean exec(Tuple input) throws IOException {
    if (input == null || input.size() < 2) {
      // If no filter and input element are provided, filter provides false.
      return Boolean.FALSE;
    }
   
    List<Object> elems = input.getAll();
    
    // First element is the word presented, "for example foo or bar or bond"
    Object expected = input.getAll().get(0);
    
    // Subsequent elements are the filter conditions
    List<Object> comparators = elems.subList(1, elems.size());
    
    return comparators.contains(expected);
  }
}
Using the two classes, the Pig Script for the same map-reduce task looks like:
comments = LOAD '/users/junittest/comments' USING com.welflex.hadoop.pig.load.PigJsonLoader() AS (commentMap:map[]);
words = FOREACH comments GENERATE FLATTEN(TOKENIZE(LOWER(commentMap#'comment')));
filter_words = FILTER words BY com.welflex.hadoop.pig.func.LikeFilter($0, '2012', 'bond', 'cave', 'james', 'walther');
grouped = GROUP filter_words BY $0;
counts = FOREACH grouped GENERATE group,COUNT(filter_words);
store counts INTO '/users/junittest/wcresults';
The unit test for Pig simply registers related jars and executes the script as follows:
public void pigMapReduce() {
    // Get the jars required for the map reduce including custom functions
    Set<String> jars = getJarsForPig();

    // Set ExecType.MAPREDUCE if you want to run in a distributed mode
    PigServer pigServer = new PigServer(ExecType.MAPREDUCE);

    for (String jar : jars) {
      // Register the jars for Pig      
      pigServer.registerJar(jar);
    }
     //Execute the pig script       
     pigServer.registerScript(WordCountMapReduceTest.class
            .getResource("/wordcount.pig").toURI().toURL().getFile());
    // Post validation to make sure the results of the map-red are correct.
    validateResult();
}
The goal of above demonstration is to be able to write the pig script in a single location in your maven project and be able to run a unit-test of the same without having to re-write the script or handle registering custom jars. From the above example, one can witness that the Pig Script is far lesser complicated and lesser verbose when compared to the java version of the same and from an execution perspective quite performant as well. The example is organized as follows:
hadoop-example
|-- hadoop <-- Demonstrate the map - reduce of both Java and Pig versions
|   |-- pom.xml
|   `-- src
|       |-- main
|       |   |-- java
|       |   |   `-- com
|       |   |       `-- welflex
|       |   |           `-- hadoop
|       |   |               `-- hdfs
|       |   |                   |-- HdfsServiceImpl.java
|       |   |                   `-- HdfsService.java
|       |   |-- pig
|       |   |   `-- wordcount.pig
|       |   `-- resources
|       |       |-- core-site.xml
|       |       `-- log4j.properties
|       `-- test
|           |-- java
|           |   `-- com
|           |       `-- welflex
|           |           `-- hadoop
|           |               `-- mapred
|           |                   |-- CommentWordMapReduce.java
|           |                   `-- WordCountMapReduceTest.java
|           `-- resources
|               `-- comments
|-- pig-funcs <----- Contains the custom Pig artifacts
|   |-- pom.xml
|   `-- src
|       `-- main
|           `-- java
|               `-- com
|                   `-- welflex
|                       `-- hadoop
|                           `-- pig
|                               |-- func
|                               |   `-- LikeFilter.java
|                               `-- load
|                                   |-- JsonLineParser.java
|                                   `-- PigJsonLoader.java
`-- pom.xml
Simply execute:
>mvn test
to witness both the Java and Pig Map Reduce versions in actions or import into your favorite IDE and do the same. You can easily change the example to search for country count or augment the size of the file and try a non-local mode of map-reduce. I must state that when working with Hadoop and family, one needs to be careful with the versions they are working with. The above mentioned example works with Cloudera's Hadoop Version Hadoop 0.20.2-CDH3B4

Download the example here and happy Pigging. Oink out, or should I say Grunt out ;-)