Difference between revisions of "Map-Reduce Examples"

From dftwiki3
Jump to: navigation, search
(BuildingMapReduce/*.sh)
 
(13 intermediate revisions by the same user not shown)
Line 2: Line 2:
 
----
 
----
 
<br />
 
<br />
 +
<bluebox>
 +
This page contains various problems and their solution implemented using the Map-Reduce programming model.
 +
</bluebox>
 +
<br />
 +
<br />
 +
 
= BuildingMapReduce =
 
= BuildingMapReduce =
== BuildingMapReduce/mapper.py ==
+
<br />
 +
This is basically the word-count program, but used as an introduction to the mapping and reducing process.
 +
<br />
 +
== mapper.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 31: Line 40:
 
</source>
 
</source>
 
<br />
 
<br />
== BuildingMapReduce/reducer.py ==
+
==reducer.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 53: Line 62:
 
           sum += count
 
           sum += count
 
       else:
 
       else:
           print( "%s\t%d" % ( word, sum ) )
+
           print( "%s\t%d" % ( lastWord, sum ) )
 
           sum = count
 
           sum = count
 
           lastWord = word
 
           lastWord = word
Line 63: Line 72:
 
</source>
 
</source>
 
<br />
 
<br />
== BuildingMapReduce/shuffleSort.py ==
+
 
 +
== shuffleSort.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 88: Line 98:
 
</source>
 
</source>
 
<br />
 
<br />
== BuildingMapReduce/runIt.sh ==
+
==runIt.sh ==
 
<br />
 
<br />
 
::<source lang="bash">
 
::<source lang="bash">
Line 99: Line 109:
  
 
= WordCount =
 
= WordCount =
== WordCount/mapper.py ==
+
==mapper.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 125: Line 135:
 
</source>
 
</source>
 
<br />
 
<br />
== WordCount/reducer.py ==
+
 
 +
== reducer.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 172: Line 183:
 
</source>
 
</source>
 
<br />
 
<br />
== WordCount/shuffleSort.py ==
+
== shuffleSort.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 203: Line 214:
 
</source>
 
</source>
 
<br />
 
<br />
== WordCount/wordCount.sh ==
+
== wordCount.sh ==
 
<br />
 
<br />
 
::<source lang="bash">
 
::<source lang="bash">
Line 213: Line 224:
 
<br />
 
<br />
 
= findMostFrequentWords =
 
= findMostFrequentWords =
== findMostFrequentWords/mapper1.py ==
+
== mapper1.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 241: Line 252:
 
</source>
 
</source>
 
<br />
 
<br />
== findMostFrequentWords/mapper2.py ==
+
==mapper2.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 261: Line 272:
 
</source>
 
</source>
 
<br />
 
<br />
== findMostFrequentWords/reducer1.py ==
+
== reducer1.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 275: Line 286:
 
       word, count = line.strip().split('\t', 1)
 
       word, count = line.strip().split('\t', 1)
 
       count = int(count)
 
       count = int(count)
 +
 
       if lastWord==None:
 
       if lastWord==None:
 
           lastWord = word
 
           lastWord = word
Line 283: Line 295:
 
           sum += count
 
           sum += count
 
       else:
 
       else:
           print( "%s\t%d" % ( word, sum ) )
+
           print( "%s\t%d" % ( lastWord, sum ) )
 
           sum = count
 
           sum = count
 
           lastWord = word
 
           lastWord = word
Line 293: Line 305:
 
</source>
 
</source>
 
<br />
 
<br />
== findMostFrequentWords/reducer2.py ==
+
 
 +
== reducer2.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 319: Line 332:
 
</source>
 
</source>
 
<br />
 
<br />
== findMostFrequentWords/shuffleSort.py ==
+
== shuffleSort.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 344: Line 357:
 
</source>
 
</source>
 
<br />
 
<br />
== findMostFrequentWords/*.sh ==
+
== runIt.sh ==
 
<br />
 
<br />
 
::<source lang="bash">
 
::<source lang="bash">
cat: findMostFrequentWords/*.sh: No such file or directory
+
#! /bin/bash
  
 +
cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py
 
</source>
 
</source>
 
<br />
 
<br />
 +
 
= Pi =
 
= Pi =
== Pi/mapper.py ==
+
==mapper.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 378: Line 393:
 
</source>
 
</source>
 
<br />
 
<br />
== Pi/reducer.py ==
+
== reducer.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 413: Line 428:
 
</source>
 
</source>
 
<br />
 
<br />
== Pi/shuffleSort.py ==
+
==shuffleSort.py ==
 
<br />
 
<br />
 
::<source lang="python">
 
::<source lang="python">
Line 444: Line 459:
 
</source>
 
</source>
 
<br />
 
<br />
== Pi/runIt.sh ==
+
== runIt.sh ==
 
<br />
 
<br />
 
::<source lang="bash">
 
::<source lang="bash">
Line 453: Line 468:
  
 
</source>
 
</source>
 +
<br />
 +
=Grep=
 +
<br />
 +
==mapper.py==
 +
<br />
 +
<source lang="python">
 +
#!/usr/bin/env python
 +
# mapper.py
 +
import sys
 +
 +
first = True
 +
 +
# input comes from STDIN (standard input)
 +
for  line in sys.stdin:
 +
    if first:
 +
        first = False
 +
        grepWord = line.strip().lower()
 +
        continue
 +
 +
    if line.lower().find( grepWord ) != -1:
 +
        print '%s\t%s' % ( grepWord, line.strip() )
 +
 +
       
 +
</source>
 +
<br />
 +
==reducer.py==
 +
<br />
 +
<source lang="python">
 +
#!/usr/bin/env python
 +
# reducer.py
 +
# implements identity operator
 +
#
 +
from __future__ import print_function
 +
 +
 +
import sys
 +
 +
# input comes from STDIN
 +
for line in sys.stdin:
 +
      print( line.strip() )
 +
 +
</source>
 +
<br />
 +
==runIt.sh==
 +
<br />
 +
<source lang="bash">
 +
#! /bin/bash
 +
 +
if [ $# -lt 2 ] ; then
 +
  echo "Syntax: $0 expression text-file [text-file]"
 +
  exit
 +
fi
 +
 +
expression=$1
 +
shift
 +
files=$@
 +
#echo "files = $files"
 +
 +
{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py
 +
 +
</source>
 +
<br />
 +
 
<br />
 
<br />
 
= InvertedIndex =
 
= InvertedIndex =
Line 580: Line 658:
 
</source>
 
</source>
 
<br />
 
<br />
 +
=Game of Life=
 +
<br />
 +
See [[CSC352 Game of Life in Map-Reduce| this page]] for solving the 2D Game of Life with Map-Reduce.
 +
<br />
 +
<br />
 +
 +
<br />
 +
<br />
 +
<br />
 +
<br />
 +
<br />
 +
<br />
 +
[[category:MapReduce]][[Category:Hadoop]]

Latest revision as of 08:57, 21 April 2017

--D. Thiebaut (talk) 09:28, 11 April 2017 (EDT)



This page contains various problems and their solution implemented using the Map-Reduce programming model.



BuildingMapReduce


This is basically the word-count program, but used as an introduction to the mapping and reducing process.

mapper.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


reducer.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)
      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper.py | ./shuffleSort.py | ./reducer.py


WordCount

mapper.py


#!/usr/bin/env python
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()

    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)
      #print( "--getting (%s, %s)" % ( str(word), str(count) ) )

      # convert count (currently a string) to int
      try:
          count = int(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      # this IF-switch only works because Hadoop sorts map output
      # by key (here: word) before it is passed to the reducer
      if current_word == word:
          current_count += count
      else:
          if current_word:
               # write result to STDOUT
               print( '%s\t%s' % (current_word, current_count) )
          current_count = count
          current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print( '%s\t%s' % (current_word, current_count) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


wordCount.sh


#! /bin/bash

echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py


findMostFrequentWords

mapper1.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


mapper2.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    word, count = line.strip().split('\t', 1)
    count = int(count)
    print( '%d\t%s' % (count, word) )


reducer1.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)

      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


reducer2.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

mostFreq = []
currentMax = -1

for line in sys.stdin:
      count, word = line.strip().split('\t', 1)
      count = int(count)
      if count > currentMax:
            currentMax = count
            mostFreq = [ word ]
      elif count == currentMax:
            mostFreq.append( word )

# output mostFreq word(s)
for word in mostFreq:
    print( '%s\t%s' % ( word, currentMax ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py


Pi

mapper.py


#!/usr/bin/env python
import sys

def f( x ):
    return 4.0 / ( 1.0 + x*x )


# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()
    N = int( words[0] )
    deltaX = 1.0 / N

    for i in range( 0, N ):
        print( "1\t%1.10f" % ( f( i * deltaX )*deltaX ) )


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

sum = 0

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)

      # convert count (currently a string) to int
      try:
          count = float(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      sum += count

# do not forget to output the last word if needed!
print( '%1.10f\t0' % sum )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


runIt.sh


#! /bin/bash

echo "1000000" | ./mapper.py | sort | ./reducer.py


Grep


mapper.py


#!/usr/bin/env python
# mapper.py
import sys

first = True

# input comes from STDIN (standard input)
for  line in sys.stdin:
    if first:
        first = False
        grepWord = line.strip().lower()
        continue

    if line.lower().find( grepWord ) != -1:
        print '%s\t%s' % ( grepWord, line.strip() )


reducer.py


#!/usr/bin/env python
# reducer.py
# implements identity operator
#
from __future__ import print_function


import sys

# input comes from STDIN
for line in sys.stdin:
      print( line.strip() )


runIt.sh


#! /bin/bash

if [ $# -lt 2 ] ; then
   echo "Syntax: $0 expression text-file [text-file]"
   exit
fi

expression=$1
shift
files=$@
#echo "files = $files"

{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py



InvertedIndex

InvertedIndex/mapper.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
import sys

first = True
fileName = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    if first:
        first = False
        fileName = line.strip()
        continue

    # create tuples of all words in line
    L = [ (word.strip().lower(), fileName ) for word in line.strip().split() ]

    # increase counters
    for word, f in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%s' % (word, f) )


InvertedIndex/reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

currentWord = None
fileList = []
word = None

# input comes from STDIN
for line in sys.stdin:
      # parse the input we got from mapper.py
      word, fileName = line.strip().split('\t', 1)

      #print( "--------" )
      #print( "--- line = ", line )
      #print( "--- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )

      # this IF-switch only works because Hadoop sorts map output
      # by key (here: word) before it is passed to the reducer
      if currentWord == None:
            currentWord = word
            fileList = [ fileName ]
            #print( "-1- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue
      if currentWord == word:
            if fileName not in fileList:
                  fileList.append( fileName )
            #print( "-2- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue
      if currentWord != word:
            print( "%s\t%s" % ( currentWord, ",".join( fileList ) ) )
            fileList = [ fileName ]
            currentWord = word
            #print( "-3- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue

# do not forget to output the last word if needed!
if currentWord == word:
    print( '%s\t%s' % (currentWord, ",".join( fileList ) ) )


InvertedIndex/shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


InvertedIndex/runIt.sh


#! /bin/bash

for i in voltaire* ; do 
   { echo $i; cat $i; } | ./mapper.py 
done  | sort | ./reducer.py


Game of Life


See this page for solving the 2D Game of Life with Map-Reduce.