Hadoop Tutorial 2.3 -- Running WordCount in Python on AWS
--D. Thiebaut 16:00, 18 April 2010 (UTC)
Updated --D. Thiebaut (talk) 08:35, 13 April 2017 (EDT)
This tutorial is the continuation of Hadoop Tutorial 1 -- Running WordCount. It is based on the excellent tutorial by Michael Noll "Writing an Hadoop MapReduce Program in Python"[1]
|
The Setup
Dataflow of information between streaming process and taskTracker processes
Image taken from [2].
All we have to do in write a mapper and a reducer function in Python, and make sure they exchange tuples with the outside world through stdin and stdout. Furthermore, the format of the data in the tuples should be that of strings.
Python Map and Reduce functions
Mapper
The mapper code is shown below. It is stored in a file called mapper.py, and does not even contain a function. All it needs to do is receive data on its stdin input and output data on its stdout.
#!/usr/bin/env python # mapper.py import sys #--- get all lines from stdin --- for line in sys.stdin: #--- remove leading and trailing whitespace--- line = line.strip() #--- split the line into words --- words = line.split() #--- output tuples [word, 1] in tab-delimited format--- for word in words: print '%s\t%s' % (word, "1")
Make sure you make the program executable:
chmod +x mapper.py
Reducer Code
Typically the reducer gets the tuples generated by the mapper, after the shuffle and sort phases.
The code is stored in a file called reducer.py.
#!/usr/bin/env python # reducer.py import sys # maps words to their counts word2count = {} # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: continue try: word2count[word] = word2count[word]+count except: word2count[word] = count # write the tuples to stdout # Note: they are unsorted for word in word2count.keys(): print '%s\t%s'% ( word, word2count[word] )
Make sure the file is executable:
chmod +x reducer.py
Testing
- Make sur your two programs work. Here's a simple series of test you can run:
cat mapper.py | ./mapper.py
- This will make mapper.py output all the words that make up its code.
cat mapper.py | ./mapper.py | sort | ./reducer.py
- This will generate the (unsorted) frequencies of all the unique words (punctuated or not) in mapper.py.
Running on a AWS Hadoop Cluster
- Please refer to this tutorial for starting a Hadoop cluster on AWS. Upload a few books (from Gutenberg.org or some other sites) to HDFS.
- Find the streaming java library:
sgeadmin@master:~$ find /usr/lib -name "*streaming*" -print /usr/lib/ruby/1.9.1/psych/streaming.rb /usr/lib/hadoop-0.20/contrib/streaming /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar
- We'll also assume that the book text files are in a directory on HDFS called books.
- We will store the results in an HDFS directory called output3
sgeadmin@master:~$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar \ -file ./mapper.py -mapper ./mapper.py \ -file ./reducer.py -reducer ./reducer.py \ -input books -output output3 packageJobJar: [./mapper.py, ./reducer.py, /mnt/hadoop/hadoop-sgeadmin/hadoop-unjar3673698552640043326/] [] /tmp/streamjob919890011433239198.jar tmpDir=null 17/04/13 12:33:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/04/13 12:33:16 WARN snappy.LoadSnappy: Snappy native library not loaded 17/04/13 12:33:16 INFO mapred.FileInputFormat: Total input paths to process : 2 17/04/13 12:33:16 INFO streaming.StreamJob: getLocalDirs(): [/mnt/hadoop/hadoop-sgeadmin/mapred/local] 17/04/13 12:33:16 INFO streaming.StreamJob: Running job: job_201704131133_0003 17/04/13 12:33:16 INFO streaming.StreamJob: To kill this job, run: 17/04/13 12:33:16 INFO streaming.StreamJob: /usr/lib/hadoop-0.20/bin/hadoop job -Dmapred.job.tracker=master:54311 -kill job_201704131133_0003 17/04/13 12:33:16 INFO streaming.StreamJob: Tracking URL: http://master:50030/jobdetails.jsp? jobid=job_201704131133_0003 17/04/13 12:33:17 INFO streaming.StreamJob: map 0% reduce 0% 17/04/13 12:33:36 INFO streaming.StreamJob: map 33% reduce 0% 17/04/13 12:33:37 INFO streaming.StreamJob: map 67% reduce 0% 17/04/13 12:33:49 INFO streaming.StreamJob: map 100% reduce 0% 17/04/13 12:33:57 INFO streaming.StreamJob: map 100% reduce 100% 17/04/13 12:34:02 INFO streaming.StreamJob: Job complete: job_201704131133_0003 17/04/13 12:34:02 INFO streaming.StreamJob: Output: output3
- Get the output file part-00000 and verify that it contains word frequencies.
Changing the number of Reducers
- To change the number of reducers, simply add this switch -jobconf mapred.reduce.tasks=16 to the command line:
sgeadmin@master:~$ hadoop dfs -rmr output3 sgeadmin@master:~$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u5.jar \ -file ./mapper.py -mapper ./mapper.py \ -file ./reducer.py -reducer ./reducer.py \ -jobconf mapred.reduce.tasks=16 \ -input books -output output3
References
- ↑ Michael Noll, Writing an Hadoop MapReduce Program in Python, www.michael-noll.com.
- ↑ Hadoop, the definitive guide, Tim White, O'Reilly Media, June 2009, ISBN 0596521979. The Web site for the book is http://www.hadoopbook.com/