Map-Reduce Examples
--D. Thiebaut (talk) 09:28, 11 April 2017 (EDT)
This page contains various problems and their solution implemented using the Map-Reduce programming model.
BuildingMapReduce
This is basically the word-count program, but used as an introduction to the mapping and reducing process.
mapper.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: # create tuples of all words in line L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ] # increase counters for word, n in L: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print( '%s\t%d' % (word, n) )
reducer.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys lastWord = None sum = 0 for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) if lastWord==None: lastWord = word sum = count continue if word==lastWord: sum += count else: print( "%s\t%d" % ( lastWord, sum ) ) sum = count lastWord = word # output last word if lastWord == word: print( '%s\t%s' % (lastWord, sum ) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace word, count = line.strip().split('\t', 1) L.append( (word, int(count) ) ) # sort the tuples L.sort( ) # output the sorted tuples for (word, count) in L: print( '%s\t%d' % (word, count) )
runIt.sh
#! /bin/bash cat ulysses.txt | ./mapper.py | ./shuffleSort.py | ./reducer.py
WordCount
mapper.py
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
reducer.py
#!/usr/bin/env python from __future__ import print_function from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) #print( "--getting (%s, %s)" % ( str(word), str(count) ) ) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line #print( "--skipping (%s, %s)" % ( str(word), str(count) ) ) continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print( '%s\t%s' % (current_word, current_count) ) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print( '%s\t%s' % (current_word, current_count) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys sortNumeric = False if len( sys.argv ) > 1 and sys.argv[1] == "-n": sortNumeric = True L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() word, count = line.split('\t', 1) if sortNumeric: word = int( word ) L.append( (word, count) ) L.sort() for word, count in L: print( '%s\t%s' % (word, count) )
wordCount.sh
#! /bin/bash echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
findMostFrequentWords
mapper1.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: # create tuples of all words in line L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ] # increase counters for word, n in L: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print( '%s\t%d' % (word, n) )
mapper2.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) print( '%d\t%s' % (count, word) )
reducer1.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys lastWord = None sum = 0 for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) if lastWord==None: lastWord = word sum = count continue if word==lastWord: sum += count else: print( "%s\t%d" % ( lastWord, sum ) ) sum = count lastWord = word # output last word if lastWord == word: print( '%s\t%s' % (lastWord, sum ) )
reducer2.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys mostFreq = [] currentMax = -1 for line in sys.stdin: count, word = line.strip().split('\t', 1) count = int(count) if count > currentMax: currentMax = count mostFreq = [ word ] elif count == currentMax: mostFreq.append( word ) # output mostFreq word(s) for word in mostFreq: print( '%s\t%s' % ( word, currentMax ) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace word, count = line.strip().split('\t', 1) L.append( (word, int(count) ) ) # sort the tuples L.sort( ) # output the sorted tuples for (word, count) in L: print( '%s\t%d' % (word, count) )
runIt.sh
#! /bin/bash cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py
Pi
mapper.py
#!/usr/bin/env python import sys def f( x ): return 4.0 / ( 1.0 + x*x ) # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() N = int( words[0] ) deltaX = 1.0 / N for i in range( 0, N ): print( "1\t%1.10f" % ( f( i * deltaX )*deltaX ) )
reducer.py
#!/usr/bin/env python from __future__ import print_function from operator import itemgetter import sys sum = 0 # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = float(count) except ValueError: # count was not a number, so silently # ignore/discard this line #print( "--skipping (%s, %s)" % ( str(word), str(count) ) ) continue sum += count # do not forget to output the last word if needed! print( '%1.10f\t0' % sum )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys sortNumeric = False if len( sys.argv ) > 1 and sys.argv[1] == "-n": sortNumeric = True L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() word, count = line.split('\t', 1) if sortNumeric: word = int( word ) L.append( (word, count) ) L.sort() for word, count in L: print( '%s\t%s' % (word, count) )
runIt.sh
#! /bin/bash echo "1000000" | ./mapper.py | sort | ./reducer.py
Grep
mapper.py
#!/usr/bin/env python
# mapper.py
import sys
first = True
# input comes from STDIN (standard input)
for line in sys.stdin:
if first:
first = False
grepWord = line.strip().lower()
continue
if line.lower().find( grepWord ) != -1:
print '%s\t%s' % ( grepWord, line.strip() )
reducer.py
#!/usr/bin/env python
# reducer.py
# implements identity operator
#
from __future__ import print_function
import sys
# input comes from STDIN
for line in sys.stdin:
print( line.strip() )
runIt.sh
#! /bin/bash
if [ $# -lt 2 ] ; then
echo "Syntax: $0 expression text-file [text-file]"
exit
fi
expression=$1
shift
files=$@
#echo "files = $files"
{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py