Map-Reduce Examples

From dftwiki3
Jump to: navigation, search

--D. Thiebaut (talk) 09:28, 11 April 2017 (EDT)



This page contains various problems and their solution implemented using the Map-Reduce programming model.



BuildingMapReduce


This is basically the word-count program, but used as an introduction to the mapping and reducing process.

mapper.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


reducer.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)
      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper.py | ./shuffleSort.py | ./reducer.py


WordCount

mapper.py


#!/usr/bin/env python
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()

    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)
      #print( "--getting (%s, %s)" % ( str(word), str(count) ) )

      # convert count (currently a string) to int
      try:
          count = int(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      # this IF-switch only works because Hadoop sorts map output
      # by key (here: word) before it is passed to the reducer
      if current_word == word:
          current_count += count
      else:
          if current_word:
               # write result to STDOUT
               print( '%s\t%s' % (current_word, current_count) )
          current_count = count
          current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print( '%s\t%s' % (current_word, current_count) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


wordCount.sh


#! /bin/bash

echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py


findMostFrequentWords

mapper1.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


mapper2.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    word, count = line.strip().split('\t', 1)
    count = int(count)
    print( '%d\t%s' % (count, word) )


reducer1.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)

      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


reducer2.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

mostFreq = []
currentMax = -1

for line in sys.stdin:
      count, word = line.strip().split('\t', 1)
      count = int(count)
      if count > currentMax:
            currentMax = count
            mostFreq = [ word ]
      elif count == currentMax:
            mostFreq.append( word )

# output mostFreq word(s)
for word in mostFreq:
    print( '%s\t%s' % ( word, currentMax ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py


Pi

mapper.py


#!/usr/bin/env python
import sys

def f( x ):
    return 4.0 / ( 1.0 + x*x )


# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()
    N = int( words[0] )
    deltaX = 1.0 / N

    for i in range( 0, N ):
        print( "1\t%1.10f" % ( f( i * deltaX )*deltaX ) )


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

sum = 0

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)

      # convert count (currently a string) to int
      try:
          count = float(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      sum += count

# do not forget to output the last word if needed!
print( '%1.10f\t0' % sum )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


runIt.sh


#! /bin/bash

echo "1000000" | ./mapper.py | sort | ./reducer.py


Grep


mapper.py


#!/usr/bin/env python
# mapper.py
import sys

first = True

# input comes from STDIN (standard input)
for  line in sys.stdin:
    if first:
        first = False
        grepWord = line.strip().lower()
        continue

    if line.lower().find( grepWord ) != -1:
        print '%s\t%s' % ( grepWord, line.strip() )


reducer.py


#!/usr/bin/env python
# reducer.py
# implements identity operator
#
from __future__ import print_function


import sys

# input comes from STDIN
for line in sys.stdin:
      print( line.strip() )


runIt.sh


#! /bin/bash

if [ $# -lt 2 ] ; then
   echo "Syntax: $0 expression text-file [text-file]"
   exit
fi

expression=$1
shift
files=$@
#echo "files = $files"

{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py



InvertedIndex

InvertedIndex/mapper.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
import sys

first = True
fileName = None

# input comes from STDIN (standard input)
for line in sys.stdin:
    if first:
        first = False
        fileName = line.strip()
        continue

    # create tuples of all words in line
    L = [ (word.strip().lower(), fileName ) for word in line.strip().split() ]

    # increase counters
    for word, f in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%s' % (word, f) )


InvertedIndex/reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

currentWord = None
fileList = []
word = None

# input comes from STDIN
for line in sys.stdin:
      # parse the input we got from mapper.py
      word, fileName = line.strip().split('\t', 1)

      #print( "--------" )
      #print( "--- line = ", line )
      #print( "--- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )

      # this IF-switch only works because Hadoop sorts map output
      # by key (here: word) before it is passed to the reducer
      if currentWord == None:
            currentWord = word
            fileList = [ fileName ]
            #print( "-1- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue
      if currentWord == word:
            if fileName not in fileList:
                  fileList.append( fileName )
            #print( "-2- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue
      if currentWord != word:
            print( "%s\t%s" % ( currentWord, ",".join( fileList ) ) )
            fileList = [ fileName ]
            currentWord = word
            #print( "-3- word = %s, fileName = %s, fileList = %s" % ( word, fileName, str( fileList )  ) )
            continue

# do not forget to output the last word if needed!
if currentWord == word:
    print( '%s\t%s' % (currentWord, ",".join( fileList ) ) )


InvertedIndex/shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


InvertedIndex/runIt.sh


#! /bin/bash

for i in voltaire* ; do 
   { echo $i; cat $i; } | ./mapper.py 
done  | sort | ./reducer.py


Game of Life


See this page for solving the 2D Game of Life with Map-Reduce.