Hadoop Tutorial 2 -- Running WordCount in Python

From dftwiki3
Jump to: navigation, search

--D. Thiebaut 16:00, 18 April 2010 (UTC)


HadoopCartoon.png



This tutorial is the continuation of Hadoop Tutorial 1 -- Running WordCount. It is based on the excellent tutorial by Michael Noll "Writing an Hadoop MapReduce Program in Python"[1]


The Setup

StreamingDataFlow.png
Dataflow of information between streaming process and taskTracker processes
Image taken from [2].

All we have to do in write a mapper and a reducer function in Python, and make sure they exchange tuples with the outside world through stdin and stdout. Furthermore, the format of the data in the tuples should be that of strings.

Python Map and Reduce functions

Mapper

The mapper code is shown below. It is stored in a file called mapper.py, and does not even contain a function. All it needs to do is receive data on its stdin input and output data on its stdout.



.
#!/usr/bin/env python
import sys
 
#--- get all lines from stdin ---
for line in sys.stdin:
    #--- remove leading and trailing whitespace---
    line = line.strip()

    #--- split the line into words ---
    words = line.split()

    #--- output tuples [word, 1] in tab-delimited format---
    for word in words: 
        print '%s\t%s' % (word, "1")

.



Make sure you make the program executable:

  chmod +x mapper.py

Reducer Code

Typically the reducer gets the tuples generated by the mapper, after the shuffle and sort phases.

The code is stored in a file called reducer.py.

.
#!/usr/bin/env python
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        continue

    try:
        word2count[word] = word2count[word]+count
    except:
        word2count[word] = count
 
# write the tuples to stdout
# Note: they are unsorted
for word in word2count.keys():
    print '%s\t%s'% ( word, word2count[word] )


.



Make sure the file is executable:

 chmod +x reducer.py


Testing

  • Make sur your two programs work. Here's a simple series of test you can run:
  cat mapper.py | mapper.py
This will make mapper.py output all the words that make up its code.
 cat mapper.py | mapper.py | sort | reducer.py 
This will generate the (unsorted) frequencies of all the unique words (punctuated or not) in mapper.py.

Running on the Hadoop Cluster

  • Let's run the Python code on the Ulysses.txt file.
  • We'll assume that the Python code is stored in ~hadoop/352/dft/python
  • We'll assume that the streaming java library is in ~hadoop/contrib/streaming/streaming-0.19.2-streaming.jar
  • We'll also assume that ulysses.txt is in dft and that we want the output in dft-output:
cd
cd 352/dft/python
hadoop dfs -rmr dft1-output 
hadoop jar /home/hadoop/hadoop/contrib/streaming/hadoop-0.19.2-streaming.jar -file ./mapper.py \
        -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py  -input dft -output dft-output

Changing the number of Reducers

  • To change the number of reducers, simply add this switch -jobconf mapred.reduce.tasks=16 to the command line:
cd
cd 352/dft/python
hadoop dfs -rmr dft1-output 
hadoop jar /home/hadoop/hadoop/contrib/streaming/hadoop-0.19.2-streaming.jar \
        -jobconf mapred.reduce.tasks=16 \
        -file ./mapper.py \
        -mapper ./mapper.py \
        -file ./reducer.py \
        -reducer ./reducer.py \  
        -input dft -output dft-output


References

  1. Michael Noll, Writing an Hadoop MapReduce Program in Python, www.michael-noll.com.
  2. Hadoop, the definitive guide, Tim White, O'Reilly Media, June 2009, ISBN 0596521979. The Web site for the book is http://www.hadoopbook.com/