Map-Reduce Examples

From dftwiki3
Revision as of 19:33, 18 April 2017 by Thiebaut (talk | contribs) (runIt.sh)
Jump to: navigation, search

--D. Thiebaut (talk) 09:28, 11 April 2017 (EDT)



This page contains various problems and their solution implemented using the Map-Reduce programming model.



BuildingMapReduce


This is basically the word-count program, but used as an introduction to the mapping and reducing process.

mapper.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


reducer.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)
      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper.py | ./shuffleSort.py | ./reducer.py


WordCount

mapper.py


#!/usr/bin/env python
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()

    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)
      #print( "--getting (%s, %s)" % ( str(word), str(count) ) )

      # convert count (currently a string) to int
      try:
          count = int(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      # this IF-switch only works because Hadoop sorts map output
      # by key (here: word) before it is passed to the reducer
      if current_word == word:
          current_count += count
      else:
          if current_word:
               # write result to STDOUT
               print( '%s\t%s' % (current_word, current_count) )
          current_count = count
          current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print( '%s\t%s' % (current_word, current_count) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


wordCount.sh


#! /bin/bash

echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py


findMostFrequentWords

mapper1.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    # create tuples of all words in line
    L = [ (word.strip().lower(), 1 ) for word in line.strip().split() ]

    # increase counters
    for word, n in L:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print( '%s\t%d' % (word, n) )


mapper2.py


#!/usr/bin/env python
# A basic mapper function/program that
# takes whatever is passed on the input and
# outputs tuples of all the words formatted
# as (word, 1) 
from __future__ import print_function
import sys

# input comes from STDIN (standard input)
for line in sys.stdin:

    word, count = line.strip().split('\t', 1)
    count = int(count)
    print( '%d\t%s' % (count, word) )


reducer1.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

lastWord = None
sum = 0

for line in sys.stdin:
      word, count = line.strip().split('\t', 1)
      count = int(count)

      if lastWord==None:
          lastWord = word
          sum = count
          continue

      if word==lastWord:
          sum += count
      else:
          print( "%s\t%d" % ( lastWord, sum ) )
          sum = count
          lastWord = word

# output last word
if lastWord == word:
    print( '%s\t%s' % (lastWord, sum ) )


reducer2.py


#!/usr/bin/env python
# reducer.py
from __future__ import print_function
import sys

mostFreq = []
currentMax = -1

for line in sys.stdin:
      count, word = line.strip().split('\t', 1)
      count = int(count)
      if count > currentMax:
            currentMax = count
            mostFreq = [ word ]
      elif count == currentMax:
            mostFreq.append( word )

# output mostFreq word(s)
for word in mostFreq:
    print( '%s\t%s' % ( word, currentMax ) )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      word, count = line.strip().split('\t', 1)
      L.append( (word, int(count) ) )

# sort the tuples
L.sort( )

# output the sorted tuples
for (word, count) in L:
    print( '%s\t%d' % (word, count) )


runIt.sh


#! /bin/bash

cat ulysses.txt | ./mapper1.py | sort | ./reducer1.py | ./mapper2.py | sort -n | ./reducer2.py


Pi

mapper.py


#!/usr/bin/env python
import sys

def f( x ):
    return 4.0 / ( 1.0 + x*x )


# input comes from STDIN (standard input)
for line in sys.stdin:

    # remove leading and trailing whitespace
    line = line.strip()

    # split the line into words
    words = line.split()
    N = int( words[0] )
    deltaX = 1.0 / N

    for i in range( 0, N ):
        print( "1\t%1.10f" % ( f( i * deltaX )*deltaX ) )


reducer.py


#!/usr/bin/env python
from __future__ import print_function
from operator import itemgetter

import sys

sum = 0

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      # parse the input we got from mapper.py
      word, count = line.split('\t', 1)

      # convert count (currently a string) to int
      try:
          count = float(count)
      except ValueError:
          # count was not a number, so silently
          # ignore/discard this line
          #print( "--skipping (%s, %s)" % ( str(word), str(count) ) )
          continue

      sum += count

# do not forget to output the last word if needed!
print( '%1.10f\t0' % sum )


shuffleSort.py


#!/usr/bin/env python
from __future__ import print_function
import sys

sortNumeric = False
if len( sys.argv ) > 1 and sys.argv[1] == "-n":
   sortNumeric = True

L = []

# input comes from STDIN
for line in sys.stdin:
      # remove leading and trailing whitespace
      line = line.strip()

      word, count = line.split('\t', 1)
      if sortNumeric:
          word = int( word )

      L.append( (word, count) )


L.sort()

for word, count in L:
    print( '%s\t%s' % (word, count) )


runIt.sh


#! /bin/bash

echo "1000000" | ./mapper.py | sort | ./reducer.py


Grep


mapper.py


#!/usr/bin/env python
# mapper.py
import sys

first = True

# input comes from STDIN (standard input)
for  line in sys.stdin:
    if first:
        first = False
        grepWord = line.strip().lower()
        continue

    if line.lower().find( grepWord ) != -1:
        print '%s\t%s' % ( grepWord, line.strip() )


reducer.py


#!/usr/bin/env python
# reducer.py
# implements identity operator
#
from __future__ import print_function


import sys

# input comes from STDIN
for line in sys.stdin:
      print( line.strip() )


runIt.sh


#! /bin/bash

if [ $# -lt 2 ] ; then
   echo "Syntax: $0 expression text-file [text-file]"
   exit
fi

expression=$1
shift
files=$@
#echo "files = $files"

{ echo $expression ; for file in $files; do cat $file ; done; } | ./mapper.py | sort | ./reducer.py



...