Difference between revisions of "Map-Reduce Examples"
(→BuildingMapReduce/*.sh) |
|||
(13 intermediate revisions by the same user not shown) | |||
Line 2: | Line 2: | ||
---- | ---- | ||
<br /> | <br /> | ||
+ | <bluebox> | ||
+ | This page contains various problems and their solution implemented using the Map-Reduce programming model. | ||
+ | </bluebox> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
= BuildingMapReduce = | = BuildingMapReduce = | ||
− | == | + | <br /> |
+ | This is basically the word-count program, but used as an introduction to the mapping and reducing process. | ||
+ | <br /> | ||
+ | == mapper.py == | ||
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 31: | Line 40: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | ==reducer.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 53: | Line 62: | ||
sum += count | sum += count | ||
else: | else: | ||
− | print( "%s\t%d" % ( | + | print( "%s\t%d" % ( lastWord, sum ) ) |
sum = count | sum = count | ||
lastWord = word | lastWord = word | ||
Line 63: | Line 72: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | |
+ | == shuffleSort.py == | ||
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 88: | Line 98: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | ==runIt.sh == |
<br /> | <br /> | ||
::<source lang="bash"> | ::<source lang="bash"> | ||
Line 99: | Line 109: | ||
= WordCount = | = WordCount = | ||
− | == | + | ==mapper.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 125: | Line 135: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | |
+ | == reducer.py == | ||
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 172: | Line 183: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == shuffleSort.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 203: | Line 214: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == wordCount.sh == |
<br /> | <br /> | ||
::<source lang="bash"> | ::<source lang="bash"> | ||
Line 213: | Line 224: | ||
<br /> | <br /> | ||
= findMostFrequentWords = | = findMostFrequentWords = | ||
− | == | + | == mapper1.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 241: | Line 252: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | ==mapper2.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 261: | Line 272: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == reducer1.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 275: | Line 286: | ||
word, count = line.strip().split('\t', 1) | word, count = line.strip().split('\t', 1) | ||
count = int(count) | count = int(count) | ||
+ | |||
if lastWord==None: | if lastWord==None: | ||
lastWord = word | lastWord = word | ||
Line 283: | Line 295: | ||
sum += count | sum += count | ||
else: | else: | ||
− | print( "%s\t%d" % ( | + | print( "%s\t%d" % ( lastWord, sum ) ) |
sum = count | sum = count | ||
lastWord = word | lastWord = word | ||
Line 293: | Line 305: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | |
+ | == reducer2.py == | ||
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 319: | Line 332: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == shuffleSort.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 344: | Line 357: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == runIt.sh == |
<br /> | <br /> | ||
::<source lang="bash"> | ::<source lang="bash"> | ||
− | + | #! /bin/bash | |
+ | cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py | ||
</source> | </source> | ||
<br /> | <br /> | ||
+ | |||
= Pi = | = Pi = | ||
− | == | + | ==mapper.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 378: | Line 393: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == reducer.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 413: | Line 428: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | ==shuffleSort.py == |
<br /> | <br /> | ||
::<source lang="python"> | ::<source lang="python"> | ||
Line 444: | Line 459: | ||
</source> | </source> | ||
<br /> | <br /> | ||
− | == | + | == runIt.sh == |
<br /> | <br /> | ||
::<source lang="bash"> | ::<source lang="bash"> | ||
Line 453: | Line 468: | ||
</source> | </source> | ||
+ | <br /> | ||
+ | =Grep= | ||
+ | <br /> | ||
+ | ==mapper.py== | ||
+ | <br /> | ||
+ | <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | # mapper.py | ||
+ | import sys | ||
+ | |||
+ | first = True | ||
+ | |||
+ | # input comes from STDIN (standard input) | ||
+ | for line in sys.stdin: | ||
+ | if first: | ||
+ | first = False | ||
+ | grepWord = line.strip().lower() | ||
+ | continue | ||
+ | |||
+ | if line.lower().find( grepWord ) != -1: | ||
+ | print '%s\t%s' % ( grepWord, line.strip() ) | ||
+ | |||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | ==reducer.py== | ||
+ | <br /> | ||
+ | <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | # reducer.py | ||
+ | # implements identity operator | ||
+ | # | ||
+ | from __future__ import print_function | ||
+ | |||
+ | |||
+ | import sys | ||
+ | |||
+ | # input comes from STDIN | ||
+ | for line in sys.stdin: | ||
+ | print( line.strip() ) | ||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | ==runIt.sh== | ||
+ | <br /> | ||
+ | <source lang="bash"> | ||
+ | #! /bin/bash | ||
+ | |||
+ | if [ $# -lt 2 ] ; then | ||
+ | echo "Syntax: $0 expression text-file [text-file]" | ||
+ | exit | ||
+ | fi | ||
+ | |||
+ | expression=$1 | ||
+ | shift | ||
+ | files=$@ | ||
+ | #echo "files = $files" | ||
+ | |||
+ | { echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py | ||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | |||
<br /> | <br /> | ||
= InvertedIndex = | = InvertedIndex = | ||
Line 580: | Line 658: | ||
</source> | </source> | ||
<br /> | <br /> | ||
+ | =Game of Life= | ||
+ | <br /> | ||
+ | See [[CSC352 Game of Life in Map-Reduce| this page]] for solving the 2D Game of Life with Map-Reduce. | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | [[category:MapReduce]][[Category:Hadoop]] |
Latest revision as of 08:57, 21 April 2017
--D. Thiebaut (talk) 09:28, 11 April 2017 (EDT)
This page contains various problems and their solution implemented using the Map-Reduce programming model.
Contents
BuildingMapReduce
This is basically the word-count program, but used as an introduction to the mapping and reducing process.
mapper.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: # create tuples of all words in line L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ] # increase counters for word, n in L: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print( '%s\t%d' % (word, n) )
reducer.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys lastWord = None sum = 0 for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) if lastWord==None: lastWord = word sum = count continue if word==lastWord: sum += count else: print( "%s\t%d" % ( lastWord, sum ) ) sum = count lastWord = word # output last word if lastWord == word: print( '%s\t%s' % (lastWord, sum ) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace word, count = line.strip().split('\t', 1) L.append( (word, int(count) ) ) # sort the tuples L.sort( ) # output the sorted tuples for (word, count) in L: print( '%s\t%d' % (word, count) )
runIt.sh
#! /bin/bash cat ulysses.txt | ./mapper.py | ./shuffleSort.py | ./reducer.py
WordCount
mapper.py
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
reducer.py
#!/usr/bin/env python from __future__ import print_function from operator import itemgetter import sys current_word = None current_count = 0 word = None # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) #print( "--getting (%s, %s)" % ( str(word), str(count) ) ) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line #print( "--skipping (%s, %s)" % ( str(word), str(count) ) ) continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print( '%s\t%s' % (current_word, current_count) ) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print( '%s\t%s' % (current_word, current_count) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys sortNumeric = False if len( sys.argv ) > 1 and sys.argv[1] == "-n": sortNumeric = True L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() word, count = line.split('\t', 1) if sortNumeric: word = int( word ) L.append( (word, count) ) L.sort() for word, count in L: print( '%s\t%s' % (word, count) )
wordCount.sh
#! /bin/bash echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py
findMostFrequentWords
mapper1.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: # create tuples of all words in line L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ] # increase counters for word, n in L: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print( '%s\t%d' % (word, n) )
mapper2.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) from __future__ import print_function import sys # input comes from STDIN (standard input) for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) print( '%d\t%s' % (count, word) )
reducer1.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys lastWord = None sum = 0 for line in sys.stdin: word, count = line.strip().split('\t', 1) count = int(count) if lastWord==None: lastWord = word sum = count continue if word==lastWord: sum += count else: print( "%s\t%d" % ( lastWord, sum ) ) sum = count lastWord = word # output last word if lastWord == word: print( '%s\t%s' % (lastWord, sum ) )
reducer2.py
#!/usr/bin/env python # reducer.py from __future__ import print_function import sys mostFreq = [] currentMax = -1 for line in sys.stdin: count, word = line.strip().split('\t', 1) count = int(count) if count > currentMax: currentMax = count mostFreq = [ word ] elif count == currentMax: mostFreq.append( word ) # output mostFreq word(s) for word in mostFreq: print( '%s\t%s' % ( word, currentMax ) )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace word, count = line.strip().split('\t', 1) L.append( (word, int(count) ) ) # sort the tuples L.sort( ) # output the sorted tuples for (word, count) in L: print( '%s\t%d' % (word, count) )
runIt.sh
#! /bin/bash cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py
Pi
mapper.py
#!/usr/bin/env python import sys def f( x ): return 4.0 / ( 1.0 + x*x ) # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() N = int( words[0] ) deltaX = 1.0 / N for i in range( 0, N ): print( "1\t%1.10f" % ( f( i * deltaX )*deltaX ) )
reducer.py
#!/usr/bin/env python from __future__ import print_function from operator import itemgetter import sys sum = 0 # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = float(count) except ValueError: # count was not a number, so silently # ignore/discard this line #print( "--skipping (%s, %s)" % ( str(word), str(count) ) ) continue sum += count # do not forget to output the last word if needed! print( '%1.10f\t0' % sum )
shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys sortNumeric = False if len( sys.argv ) > 1 and sys.argv[1] == "-n": sortNumeric = True L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() word, count = line.split('\t', 1) if sortNumeric: word = int( word ) L.append( (word, count) ) L.sort() for word, count in L: print( '%s\t%s' % (word, count) )
runIt.sh
#! /bin/bash echo "1000000" | ./mapper.py | sort | ./reducer.py
Grep
mapper.py
#!/usr/bin/env python
# mapper.py
import sys
first = True
# input comes from STDIN (standard input)
for line in sys.stdin:
if first:
first = False
grepWord = line.strip().lower()
continue
if line.lower().find( grepWord ) != -1:
print '%s\t%s' % ( grepWord, line.strip() )
reducer.py
#!/usr/bin/env python
# reducer.py
# implements identity operator
#
from __future__ import print_function
import sys
# input comes from STDIN
for line in sys.stdin:
print( line.strip() )
runIt.sh
#! /bin/bash
if [ $# -lt 2 ] ; then
echo "Syntax: $0 expression text-file [text-file]"
exit
fi
expression=$1
shift
files=$@
#echo "files = $files"
{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py
InvertedIndex
InvertedIndex/mapper.py
#!/usr/bin/env python # A basic mapper function/program that # takes whatever is passed on the input and # outputs tuples of all the words formatted # as (word, 1) import sys first = True fileName = None # input comes from STDIN (standard input) for line in sys.stdin: if first: first = False fileName = line.strip() continue # create tuples of all words in line L = [ (word.strip().lower(), fileName ) for word in line.strip().split() ] # increase counters for word, f in L: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print( '%s\t%s' % (word, f) )
InvertedIndex/reducer.py
#!/usr/bin/env python from __future__ import print_function from operator import itemgetter import sys currentWord = None fileList = [] word = None # input comes from STDIN for line in sys.stdin: # parse the input we got from mapper.py word, fileName = line.strip().split('\t', 1) #print( "--------" ) #print( "--- line = ", line ) #print( "--- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList ) ) ) # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if currentWord == None: currentWord = word fileList = [ fileName ] #print( "-1- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList ) ) ) continue if currentWord == word: if fileName not in fileList: fileList.append( fileName ) #print( "-2- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList ) ) ) continue if currentWord != word: print( "%s\t%s" % ( currentWord, ",".join( fileList ) ) ) fileList = [ fileName ] currentWord = word #print( "-3- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList ) ) ) continue # do not forget to output the last word if needed! if currentWord == word: print( '%s\t%s' % (currentWord, ",".join( fileList ) ) )
InvertedIndex/shuffleSort.py
#!/usr/bin/env python from __future__ import print_function import sys sortNumeric = False if len( sys.argv ) > 1 and sys.argv[1] == "-n": sortNumeric = True L = [] # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() word, count = line.split('\t', 1) if sortNumeric: word = int( word ) L.append( (word, count) ) L.sort() for word, count in L: print( '%s\t%s' % (word, count) )
InvertedIndex/runIt.sh
#! /bin/bash for i in voltaire* ; do { echo $i; cat $i; } | ./mapper.py done | sort | ./reducer.py
Game of Life
See this page for solving the 2D Game of Life with Map-Reduce.