Difference between revisions of "CSC352 Homework 4 Solution"
(→Problem #2) |
|||
(12 intermediate revisions by the same user not shown) | |||
Line 13: | Line 13: | ||
Because we see that 16 (12+4) map tasks run altogether, we deduce that we had 16 splits total, requiring 16 tasks. 12 ran first, then were taken down while 4 more were started. | Because we see that 16 (12+4) map tasks run altogether, we deduce that we had 16 splits total, requiring 16 tasks. 12 ran first, then were taken down while 4 more were started. | ||
− | ==Problem #2== | + | ==Problem #2: Unique Categories== |
Below is a Map/Reduce pair of Python streaming programs for a job similar to what was required. Instead of reporting the unique categories, the program return the categories and a list of Ids of wiki pages in which they appear. | Below is a Map/Reduce pair of Python streaming programs for a job similar to what was required. Instead of reporting the unique categories, the program return the categories and a list of Ids of wiki pages in which they appear. | ||
Line 26: | Line 26: | ||
category:(lists of hospitals in the united states) [1010001] | category:(lists of hospitals in the united states) [1010001] | ||
+ | '''Note that attempts to use ''regular expressions'' to grab strings in html tags were not successful. Even though the program ran fine in a serial form, from the command line, it wouldn't run to completion under Hadoop. Instead a simpler method of locating the position of the tags and splicing the string between the indexes was used. ''' (Debugging parallel program has always been and still is a challenging task). | ||
+ | |||
+ | The potential problem with running our MapReduce program on the large xml files is that when they are spliced into several splits, the splitter part of MapReduce will not know that there are logical breaks between pages, delimited by <xml> and </xml>. Very likely, the first split will contain the first half of a wiki page, and the second split will contain the other half. If the split cuts the file between the <categories> and </categories> tags, then we lose some categories in the process... | ||
===Mapper=== | ===Mapper=== | ||
<br /> | <br /> | ||
Line 83: | Line 86: | ||
# list 1 10000001 | # list 1 10000001 | ||
# | # | ||
+ | # TYPICAL RUN AND OUTPUT OF HADOOP JOB: | ||
+ | # | ||
+ | # hadoop jar /home/hadoop/hadoop/contrib/streaming/hadoop-0.19.2-streaming.jar \ | ||
+ | # > -inputreader "StreamXmlRecordReader,begin=<xml>,end=</xml>" \ | ||
+ | # > -file ./mapper_cat.py -mapper ./mapper_cat.py \ | ||
+ | # > -file ./reducer_cat.py -reducer ./reducer_cat.py \ | ||
+ | # > -input wikipages/few -output dft-xml-output | ||
+ | # | ||
+ | # category:(10th century in ireland) [10130001] | ||
+ | # category:(1976 films) [10000001] | ||
+ | # category:(argentine films) [10000001] | ||
+ | # category:(economy of côte d'ivoire) [10160001] | ||
+ | # category:(estuaries in england) [100001,200002] | ||
+ | # category:(hospitals in north carolina) [1010001] | ||
+ | # category:(lists of hospitals in the united states) [1010001] | ||
+ | # category:(medieval ireland) [10130001] | ||
+ | # category:(norfolk broads) [200002,100001] | ||
+ | # category:(north carolina-related lists) [1010001] | ||
+ | # category:(ramsar sites in england) [100001,200002] | ||
+ | # category:(royal society for the protection of birds reserves in england) [200002,100001] | ||
+ | # category:(schizophrenia) [10240001] | ||
+ | # category:(sites of special scientific interest in norfolk) [100001,200002] | ||
+ | # category:(spanish-language films) [10000001] | ||
+ | # category:(symptoms) [10240001] | ||
+ | # category:(tourism by country) [10160001] | ||
+ | # | ||
import sys | import sys | ||
Line 180: | Line 209: | ||
<br /> | <br /> | ||
− | ==Problem #3== | + | ===Java Solution=== |
+ | |||
+ | Here's a java solution provided by Yang and Xiaoting. | ||
+ | |||
+ | <br /> | ||
+ | <source lang="java"> | ||
+ | /** | ||
+ | * Licensed to the Apache Software Foundation (ASF) under one | ||
+ | * or more contributor license agreements. See the NOTICE file | ||
+ | * distributed with this work for additional information | ||
+ | * regarding copyright ownership. The ASF licenses this file | ||
+ | * to you under the Apache License, Version 2.0 (the | ||
+ | * "License"); you may not use this file except in compliance | ||
+ | * with the License. You may obtain a copy of the License at | ||
+ | * | ||
+ | * http://www.apache.org/licenses/LICENSE-2.0 | ||
+ | * | ||
+ | * Unless required by applicable law or agreed to in writing, software | ||
+ | * distributed under the License is distributed on an "AS IS" BASIS, | ||
+ | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
+ | * See the License for the specific language governing permissions and | ||
+ | * limitations under the License. | ||
+ | */ | ||
+ | |||
+ | package org.hadoop.hadoop; | ||
+ | |||
+ | import java.io.IOException; | ||
+ | import java.util.ArrayList; | ||
+ | import java.util.Iterator; | ||
+ | import java.util.List; | ||
+ | import java.util.StringTokenizer; | ||
+ | import java.util.regex.*; | ||
+ | |||
+ | import org.apache.hadoop.conf.Configuration; | ||
+ | import org.apache.hadoop.conf.Configured; | ||
+ | import org.apache.hadoop.fs.Path; | ||
+ | import org.apache.hadoop.io.IntWritable; | ||
+ | import org.apache.hadoop.io.LongWritable; | ||
+ | import org.apache.hadoop.io.Text; | ||
+ | import org.apache.hadoop.mapred.FileInputFormat; | ||
+ | import org.apache.hadoop.mapred.FileOutputFormat; | ||
+ | import org.apache.hadoop.mapred.JobClient; | ||
+ | import org.apache.hadoop.mapred.JobConf; | ||
+ | import org.apache.hadoop.mapred.MapReduceBase; | ||
+ | import org.apache.hadoop.mapred.Mapper; | ||
+ | import org.apache.hadoop.mapred.OutputCollector; | ||
+ | import org.apache.hadoop.mapred.Reducer; | ||
+ | import org.apache.hadoop.mapred.Reporter; | ||
+ | import org.apache.hadoop.util.Tool; | ||
+ | import org.apache.hadoop.util.ToolRunner; | ||
+ | |||
+ | /** | ||
+ | * This is an example Hadoop Map/Reduce application. | ||
+ | * It reads the text input files, breaks each line into words | ||
+ | * and reports the number of unique categories and the number of times. | ||
+ | * | ||
+ | * To run: bin/hadoop jar build/hadoop-examples.jar catcount | ||
+ | * [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> | ||
+ | */ | ||
+ | public class CatCount extends Configured implements Tool { | ||
+ | /** | ||
+ | * define my own counters | ||
+ | */ | ||
+ | enum MyCounters { | ||
+ | MAPFUNCTIONCALLS, | ||
+ | REDUCEFUNCTIONCALLS, | ||
+ | BUCK | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * Counts the words in each line. | ||
+ | * For each line of input, break the line into words and emit them as | ||
+ | * (<b>word</b>, <b>1</b>). | ||
+ | * | ||
+ | * | ||
+ | */ | ||
+ | public static class MapClass extends MapReduceBase | ||
+ | implements Mapper<LongWritable, Text, Text, IntWritable> { | ||
+ | |||
+ | private final static IntWritable one = new IntWritable(1); | ||
+ | private Text word = new Text(); | ||
+ | |||
+ | public void map(LongWritable key, Text value, | ||
+ | OutputCollector<Text, IntWritable> output, | ||
+ | Reporter reporter) throws IOException { | ||
+ | // increment task counter | ||
+ | reporter.incrCounter( MyCounters.MAPFUNCTIONCALLS, 1 ); | ||
+ | // use regex to match category name | ||
+ | Pattern p = Pattern.compile("<cat>([\\w\\s]*)</cat>"); | ||
+ | Matcher m = p.matcher(value.toString()); | ||
+ | while (m.find()){ | ||
+ | word.set(m.group(1)); | ||
+ | output.collect(word,one); | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * A reducer class that just emits the sum of the input values. | ||
+ | */ | ||
+ | public static class Reduce extends MapReduceBase | ||
+ | implements Reducer<Text, IntWritable, Text, IntWritable> { | ||
+ | |||
+ | public void reduce(Text key, Iterator<IntWritable> values, | ||
+ | OutputCollector<Text, IntWritable> output, | ||
+ | Reporter reporter) throws IOException { | ||
+ | // increment reduce counter | ||
+ | reporter.incrCounter( MyCounters.REDUCEFUNCTIONCALLS, 1 ); | ||
+ | |||
+ | |||
+ | int sum = 0; | ||
+ | while (values.hasNext()) { | ||
+ | sum += values.next().get(); | ||
+ | } | ||
+ | output.collect(key, new IntWritable(sum)); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | static int printUsage() { | ||
+ | System.out.println("catcount [-m <maps>] [-r <reduces>] <input> <output>"); | ||
+ | ToolRunner.printGenericCommandUsage(System.out); | ||
+ | return -1; | ||
+ | } | ||
+ | |||
+ | /** | ||
+ | * The main driver for word count map/reduce program. | ||
+ | * Invoke this method to submit the map/reduce job. | ||
+ | * @throws IOException When there is communication problems with the | ||
+ | * job tracker. | ||
+ | */ | ||
+ | public int run(String[] args) throws Exception { | ||
+ | JobConf conf = new JobConf(getConf(), CatCount.class); | ||
+ | conf.setJobName("catcount"); | ||
+ | |||
+ | // the keys are words (strings) | ||
+ | conf.setOutputKeyClass(Text.class); | ||
+ | // the values are counts (ints) | ||
+ | conf.setOutputValueClass(IntWritable.class); | ||
+ | |||
+ | conf.setMapperClass(MapClass.class); | ||
+ | conf.setCombinerClass(Reduce.class); | ||
+ | conf.setReducerClass(Reduce.class); | ||
+ | |||
+ | List<String> other_args = new ArrayList<String>(); | ||
+ | for(int i=0; i < args.length; ++i) { | ||
+ | try { | ||
+ | if ("-m".equals(args[i])) { | ||
+ | conf.setNumMapTasks(Integer.parseInt(args[++i])); | ||
+ | } else if ("-r".equals(args[i])) { | ||
+ | conf.setNumReduceTasks(Integer.parseInt(args[++i])); | ||
+ | } else { | ||
+ | other_args.add(args[i]); | ||
+ | } | ||
+ | } catch (NumberFormatException except) { | ||
+ | System.out.println("ERROR: Integer expected instead of " + args[i]); | ||
+ | return printUsage(); | ||
+ | } catch (ArrayIndexOutOfBoundsException except) { | ||
+ | System.out.println("ERROR: Required parameter missing from " + | ||
+ | args[i-1]); | ||
+ | return printUsage(); | ||
+ | } | ||
+ | } | ||
+ | // Make sure there are exactly 2 parameters left. | ||
+ | if (other_args.size() != 2) { | ||
+ | System.out.println("ERROR: Wrong number of parameters: " + | ||
+ | other_args.size() + " instead of 2."); | ||
+ | return printUsage(); | ||
+ | } | ||
+ | FileInputFormat.setInputPaths(conf, other_args.get(0)); | ||
+ | FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1))); | ||
+ | |||
+ | JobClient.runJob(conf); | ||
+ | return 0; | ||
+ | } | ||
+ | |||
+ | |||
+ | public static void main(String[] args) throws Exception { | ||
+ | int res = ToolRunner.run(new Configuration(), new CatCount(), args); | ||
+ | System.exit(res); | ||
+ | } | ||
+ | |||
+ | } | ||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | |||
+ | ==Problem #3: Unique Words== | ||
+ | |||
+ | Below is a pair of Map/Reduce programs for counting unique words in xml blocks. | ||
+ | |||
+ | Note that attempts to use regular expressions to grab strings in html tags were not successful. Instead a simpler method of locating the position of the tags and splicing the string between the indexes was used. | ||
+ | |||
===Mapper=== | ===Mapper=== | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
<source lang="python"> | <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | # mapper_word.py | ||
+ | # D. Thiebaut | ||
+ | # | ||
+ | # parses a wiki page that is in xml format | ||
+ | # and included in <xml> ... </xml> tags. | ||
+ | # Example: | ||
+ | # | ||
+ | #<xml> | ||
+ | #<title>Breydon Water</title> | ||
+ | #<id>100001</id> | ||
+ | #<contributors> | ||
+ | #<contrib> | ||
+ | #<ip>89.240.43.102</ip> | ||
+ | # | ||
+ | #<length>3119</length></contrib> | ||
+ | #</contributors> | ||
+ | #<categories> | ||
+ | #<cat>Estuaries in England</cat> | ||
+ | #<cat>Ramsar sites in England</cat> | ||
+ | #<cat>Royal Society for the Protection of Birds reserves in England</cat> | ||
+ | #<cat>Norfolk Broads</cat> | ||
+ | #<cat>Sites of Special Scientific Interest in Norfolk</cat> | ||
+ | #</categories> | ||
+ | #<pagelinks> | ||
+ | #<page>Arthur Henry Patterson</page> | ||
+ | #<page>Arthur Ransome</page> | ||
+ | #<page>Avocets</page> | ||
+ | #<page>Bewick's Swan</page> | ||
+ | #</pagelinks> | ||
+ | #<text> | ||
+ | #Image. Breydon-north.jpg thumb 250px ... Broads . Category. Sites of Special Scientific Interest in Norfolk . . . . | ||
+ | #</text> | ||
+ | #</xml> | ||
+ | # | ||
+ | # the program outputs the five most frequent words. | ||
+ | # | ||
+ | # ivoire 2 | ||
+ | #aquatic 5 | ||
+ | # estuary 10 | ||
+ | # photo 1 | ||
+ | # rob 4 | ||
+ | # years 1 | ||
+ | |||
+ | import sys | ||
+ | import string | ||
+ | |||
+ | |||
+ | stopwords = {'all': 1, "she'll": 1, "don't": 1, 'being': 1, 'over': 1, 'through': 1, 'yourselves': 1, 'its': 1, | ||
+ | 'before': 1, "he's": 1, "when's": 1, "we've": 1, 'had': 1, 'should': 1, "he'd": 1, 'to': 1, | ||
+ | 'only': 1, "there's": 1, 'those': 1, 'under': 1, 'has': 1, "haven't": 1, 'do': 1, 'them': 1, | ||
+ | 'his': 1, "they'll": 1, 'very': 1, "who's": 1, "they'd": 1, 'cannot': 1, "you've": 1, 'they': 1, | ||
+ | 'not': 1, 'during': 1, 'yourself': 1, 'him': 1, 'nor': 1, "we'll": 1, 'did': 1, "they've": 1, | ||
+ | 'this': 1, 'she': 1, 'each': 1, "won't": 1, 'where': 1, "mustn't": 1, "isn't": 1, "i'll": 1, | ||
+ | "why's": 1, 'because': 1, "you'd": 1, 'doing': 1, 'some': 1, 'up': 1, 'are': 1, 'further': 1, | ||
+ | 'out': 1, 'what': 1, 'for': 1, 'while': 1, "wasn't": 1, 'does': 1, "shouldn't": 1, 'above': 1, | ||
+ | 'between': 1, 'be': 1, 'we': 1, 'who': 1, "you're": 1, 'were': 1, 'here': 1, 'hers': 1, "aren't": 1, | ||
+ | 'by': 1, 'both': 1, 'about': 1, 'would': 1, 'of': 1, 'could': 1, 'against': 1, "i'd": 1, | ||
+ | "weren't": 1, "i'm": 1, 'or': 1, "can't": 1, 'own': 1, 'into': 1, 'whom': 1, 'down': 1, "hadn't": 1, | ||
+ | "couldn't": 1, 'your': 1, "doesn't": 1, 'from': 1, "how's": 1, 'her': 1, 'their': 1, "it's": 1, | ||
+ | 'there': 1, 'been': 1, 'why': 1, 'few': 1, 'too': 1, 'themselves': 1, 'was': 1, 'until': 1, | ||
+ | 'more': 1, 'himself': 1, "where's": 1, "i've": 1, 'with': 1, "didn't": 1, "what's": 1, 'but': 1, | ||
+ | 'herself': 1, 'than': 1, "here's": 1, 'he': 1, 'me': 1, "they're": 1, 'myself': 1, 'these': 1, | ||
+ | "hasn't": 1, 'below': 1, 'ought': 1, 'theirs': 1, 'my': 1, "wouldn't": 1, "we'd": 1, 'and': 1, | ||
+ | 'then': 1, 'is': 1, 'am': 1, 'it': 1, 'an': 1, 'as': 1, 'itself': 1, 'at': 1, 'have': 1, 'in': 1, | ||
+ | 'any': 1, 'if': 1, 'again': 1, 'no': 1, 'that': 1, 'when': 1, 'same': 1, 'how': 1, 'other': 1, | ||
+ | 'which': 1, 'you': 1, "shan't": 1, ' ourselves': 1, 'our': 1, 'after': 1, "let's": 1, 'most': 1, | ||
+ | 'ours ': 1, 'such': 1, 'on': 1, "he'll": 1, 'a': 1, 'off': 1, 'i': 1, "she'd": 1, 'yours': 1, | ||
+ | "you'll": 1, 'so': 1, "we're": 1, "she's": 1, 'the': 1, "that's": 1, 'having': 1, 'once': 1, 's':1} | ||
+ | |||
+ | |||
+ | def grab( tag, xml ): | ||
+ | """grabs string between <tag> and </tag> in xml string""" | ||
+ | try: | ||
+ | index1 = xml.find( '<%s>' % tag ) | ||
+ | index2 = xml.find( '</%s>' % tag ) | ||
+ | return xml[ index1+len( '<%s>' % tag ): index2 ] | ||
+ | except: | ||
+ | return "" | ||
+ | |||
+ | |||
+ | def processInput( debug=False ): | ||
+ | #--- accumulate input lines from stdin --- | ||
+ | xmlText = "" | ||
+ | for line in sys.stdin: | ||
+ | xmlText += line.lower() | ||
+ | |||
+ | #--- grab text and Id from xml input --- | ||
+ | text = grab( "text", xmlText ) | ||
+ | id = grab( "id", xmlText ) | ||
+ | |||
+ | return ( id, text ) | ||
+ | |||
+ | def getMostFreq( text, debug=False ): | ||
+ | """computes 5 most frequent words""" | ||
+ | global stopwords | ||
+ | |||
+ | #--- remove punctuation (except quote) --- | ||
+ | table = string.maketrans( "", "" ) | ||
+ | punctuation = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~' | ||
+ | text = text.translate( table, punctuation ) | ||
+ | words = text.lower().split() | ||
+ | wordDico = {} | ||
+ | for word in words: | ||
+ | #--- skip stop-words --- | ||
+ | if stopwords.has_key( word ): | ||
+ | continue | ||
+ | |||
+ | #--- accumulate count --- | ||
+ | if wordDico.has_key( word ): | ||
+ | wordDico[ word ] += 1 | ||
+ | else: | ||
+ | wordDico[ word ] = 1 | ||
+ | |||
+ | L = [ (freq,word) for (word,freq) in wordDico.items() ] | ||
+ | L.sort() | ||
+ | L = L[-10:] | ||
+ | L.reverse() | ||
+ | return L | ||
+ | |||
+ | def main( debug=False ): | ||
+ | #--- get Id and text from stdin --- | ||
+ | id, text = processInput( debug ) | ||
+ | |||
+ | #--- output most frquent words, if requested for --- | ||
+ | mostFreqWords = getMostFreq( text, debug ) | ||
+ | for freq, word in mostFreqWords: | ||
+ | print "%s\t%s" % ( word, freq ) | ||
+ | |||
+ | #--- debugging information --- | ||
+ | if debug: | ||
+ | print "-"*60 | ||
+ | print "DEBUG" | ||
+ | print "-"*60 | ||
+ | print "id = ", id | ||
+ | print "text = ", text[0:20],"...", text[-20:] | ||
+ | print "mostFreqWords = ", mostFreqWords | ||
+ | print "-"*60 | ||
+ | |||
+ | |||
+ | main( False ) | ||
+ | |||
+ | |||
</source> | </source> | ||
Line 194: | Line 557: | ||
<br /> | <br /> | ||
<source lang="python"> | <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | # reducer_word.py | ||
+ | # | ||
+ | import sys | ||
+ | |||
+ | # maps words to their counts | ||
+ | word2count = {} | ||
+ | |||
+ | # input comes from STDIN | ||
+ | for line in sys.stdin: | ||
+ | # remove leading and trailing whitespace | ||
+ | line = line.strip() | ||
+ | |||
+ | # parse the input we got from mapper.py | ||
+ | word, count = line.split('\t', 1) | ||
+ | # convert count (currently a string) to int | ||
+ | try: | ||
+ | count = int(count) | ||
+ | except ValueError: | ||
+ | continue | ||
+ | |||
+ | try: | ||
+ | word2count[word] = word2count[word]+count | ||
+ | except: | ||
+ | word2count[word] = count | ||
+ | |||
+ | # write the results to STDOUT (standard output) | ||
+ | for word in word2count.keys(): | ||
+ | print '%s\t%s'% ( word, word2count[word] ) | ||
+ | |||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | ===Slightly Different solution=== | ||
+ | |||
+ | This solution outputs tuples of the form: | ||
+ | |||
+ | ... | ||
+ | refer 1:1020001 | ||
+ | breydon 11:200002,11:100001 | ||
+ | water 8:200002,8:100001 | ||
+ | que 1:10000001 | ||
+ | article 2:10080001 | ||
+ | ... | ||
+ | |||
+ | where the first part is a ''word'', and the second part is a coma-separated list of ''count'':''Id'', where ''count'' is the number of times the ''word'' appears in the <text></text> part of the Wiki-page with the given ''Id''. | ||
+ | |||
+ | ====Mapper.py==== | ||
+ | |||
+ | <br /> | ||
+ | <source lang="python"> | ||
+ | hadoop@hadoop1:~/352/dft/hw4$ cat mapper_word2.py | ||
+ | #!/usr/bin/env python | ||
+ | # mapper_word2.py | ||
+ | # D. Thiebaut | ||
+ | # | ||
+ | |||
+ | import sys | ||
+ | import string | ||
+ | |||
+ | |||
+ | stopwords = {'all': 1, "she'll": 1, "don't": 1, 'being': 1, 'over': 1, 'through': 1, 'yourselves': 1, 'its': 1, | ||
+ | 'before': 1, "he's": 1, "when's": 1, "we've": 1, 'had': 1, 'should': 1, "he'd": 1, 'to': 1, | ||
+ | 'only': 1, "there's": 1, 'those': 1, 'under': 1, 'has': 1, "haven't": 1, 'do': 1, 'them': 1, | ||
+ | 'his': 1, "they'll": 1, 'very': 1, "who's": 1, "they'd": 1, 'cannot': 1, "you've": 1, 'they': 1, | ||
+ | 'not': 1, 'during': 1, 'yourself': 1, 'him': 1, 'nor': 1, "we'll": 1, 'did': 1, "they've": 1, | ||
+ | 'this': 1, 'she': 1, 'each': 1, "won't": 1, 'where': 1, "mustn't": 1, "isn't": 1, "i'll": 1, | ||
+ | "why's": 1, 'because': 1, "you'd": 1, 'doing': 1, 'some': 1, 'up': 1, 'are': 1, 'further': 1, | ||
+ | 'out': 1, 'what': 1, 'for': 1, 'while': 1, "wasn't": 1, 'does': 1, "shouldn't": 1, 'above': 1, | ||
+ | 'between': 1, 'be': 1, 'we': 1, 'who': 1, "you're": 1, 'were': 1, 'here': 1, 'hers': 1, "aren't": 1, | ||
+ | 'by': 1, 'both': 1, 'about': 1, 'would': 1, 'of': 1, 'could': 1, 'against': 1, "i'd": 1, | ||
+ | "weren't": 1, "i'm": 1, 'or': 1, "can't": 1, 'own': 1, 'into': 1, 'whom': 1, 'down': 1, "hadn't": 1, | ||
+ | "couldn't": 1, 'your': 1, "doesn't": 1, 'from': 1, "how's": 1, 'her': 1, 'their': 1, "it's": 1, | ||
+ | 'there': 1, 'been': 1, 'why': 1, 'few': 1, 'too': 1, 'themselves': 1, 'was': 1, 'until': 1, | ||
+ | 'more': 1, 'himself': 1, "where's": 1, "i've": 1, 'with': 1, "didn't": 1, "what's": 1, 'but': 1, | ||
+ | 'herself': 1, 'than': 1, "here's": 1, 'he': 1, 'me': 1, "they're": 1, 'myself': 1, 'these': 1, | ||
+ | "hasn't": 1, 'below': 1, 'ought': 1, 'theirs': 1, 'my': 1, "wouldn't": 1, "we'd": 1, 'and': 1, | ||
+ | 'then': 1, 'is': 1, 'am': 1, 'it': 1, 'an': 1, 'as': 1, 'itself': 1, 'at': 1, 'have': 1, 'in': 1, | ||
+ | 'any': 1, 'if': 1, 'again': 1, 'no': 1, 'that': 1, 'when': 1, 'same': 1, 'how': 1, 'other': 1, | ||
+ | 'which': 1, 'you': 1, "shan't": 1, ' ourselves': 1, 'our': 1, 'after': 1, "let's": 1, 'most': 1, | ||
+ | 'ours ': 1, 'such': 1, 'on': 1, "he'll": 1, 'a': 1, 'off': 1, 'i': 1, "she'd": 1, 'yours': 1, | ||
+ | "you'll": 1, 'so': 1, "we're": 1, "she's": 1, 'the': 1, "that's": 1, 'having': 1, 'once': 1, 's':1} | ||
+ | |||
+ | |||
+ | def grab( tag, xml ): | ||
+ | try: | ||
+ | index1 = xml.find( '<%s>' % tag ) | ||
+ | index2 = xml.find( '</%s>' % tag ) | ||
+ | return xml[ index1+len( '<%s>' % tag ): index2 ] | ||
+ | except: | ||
+ | return "" | ||
+ | |||
+ | |||
+ | def processInput( debug=False ): | ||
+ | #--- accumulate input lines from stdin --- | ||
+ | |||
+ | xmlText = "" | ||
+ | for line in sys.stdin: | ||
+ | xmlText += line.lower() | ||
+ | |||
+ | text = grab( "text", xmlText ) | ||
+ | id = grab( "id", xmlText ) | ||
+ | |||
+ | return ( id, text ) | ||
+ | |||
+ | def getMostFreq( text, debug=False ): | ||
+ | global stopwords | ||
+ | table = string.maketrans( "", "" ) | ||
+ | punctuation = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~' | ||
+ | text = text.translate( table, punctuation ) | ||
+ | words = text.lower().split() | ||
+ | wordDico = {} | ||
+ | for word in words: | ||
+ | if stopwords.has_key( word ): | ||
+ | continue | ||
+ | if wordDico.has_key( word ): | ||
+ | wordDico[ word ] += 1 | ||
+ | else: | ||
+ | wordDico[ word ] = 1 | ||
+ | |||
+ | L = [ (freq,word) for (word,freq) in wordDico.items() ] | ||
+ | L.sort() | ||
+ | L = L[-10:] | ||
+ | L.reverse() | ||
+ | return L | ||
+ | |||
+ | def main( debug=False ): | ||
+ | #--- get arguments from command line --- | ||
+ | Id, text = processInput( debug ) | ||
+ | |||
+ | #--- output most frquent words, if requested for --- | ||
+ | mostFreqWords = getMostFreq( text, debug ) | ||
+ | for freq, word in mostFreqWords: | ||
+ | print "%s\t%s %s" % ( word, freq, Id ) | ||
+ | |||
+ | #--- debugging information --- | ||
+ | if debug: | ||
+ | print "-"*60 | ||
+ | print "DEBUG" | ||
+ | print "-"*60 | ||
+ | print "id = ", id | ||
+ | print "text = ", text[0:20],"...", text[-20:] | ||
+ | print "mostFreqWords = ", mostFreqWords | ||
+ | print "-"*60 | ||
+ | |||
+ | |||
+ | main( False ) | ||
</source> | </source> | ||
<br /> | <br /> | ||
+ | |||
+ | ====Reducer.py==== | ||
<br /> | <br /> | ||
+ | <source lang="python"> | ||
+ | #!/usr/bin/env python | ||
+ | # reduce_word2.py | ||
+ | # | ||
+ | import sys | ||
+ | |||
+ | # maps words to their counts | ||
+ | word2count = {} | ||
+ | |||
+ | # input comes from STDIN | ||
+ | for line in sys.stdin: | ||
+ | # remove leading and trailing whitespace | ||
+ | line = line.strip() | ||
+ | |||
+ | # parse the input we got from mapper.py | ||
+ | word, pair = line.split('\t', 1) | ||
+ | count, id = pair.split() | ||
+ | |||
+ | try: | ||
+ | word2count[word].append( "%s:%s" % (count,id) ) | ||
+ | except: | ||
+ | word2count[word] = [ "%s:%s" % (count,id) ] | ||
+ | |||
+ | # write the results to STDOUT (standard output) | ||
+ | for word in word2count.keys(): | ||
+ | print '%s\t%s'% ( word, ",".join( word2count[word] ) ) | ||
+ | |||
+ | |||
+ | </source> | ||
+ | <br /> | ||
+ | |||
<br /> | <br /> | ||
<br /> | <br /> |
Latest revision as of 09:06, 27 April 2010
--D. Thiebaut 20:24, 25 April 2010 (UTC)
Contents
Problem #1: Timeline
Several observations were in order:
- The cluster only has 5 task-tracker nodes
- The default for Hadoop is two map tasks per task-tracker per job.
- This explains the maximum of 10 map tasks running first, then stopping, and then closing down, then 6 map tasks taking over, with an overlap of 4 map tasks.
- 4 files of approximately 180 MB comprise the input. Hadoop splits input files into splits of 64 MB.
- 180 MB will correspond to 3 or 4 splits, depending on the exact size of the file. This means between 12 to 16 splits for the 4 files.
Because we see that 16 (12+4) map tasks run altogether, we deduce that we had 16 splits total, requiring 16 tasks. 12 ran first, then were taken down while 4 more were started.
Problem #2: Unique Categories
Below is a Map/Reduce pair of Python streaming programs for a job similar to what was required. Instead of reporting the unique categories, the program return the categories and a list of Ids of wiki pages in which they appear.
Examples of output
category:(10th century in ireland) [10130001]category:(1976 films) [10000001] category:(argentine films) [10000001] category:(economy of côte d'ivoire) [10160001] category:(estuaries in england) [100001,200002] category:(hospitals in north carolina) [1010001] category:(lists of hospitals in the united states) [1010001]
Note that attempts to use regular expressions to grab strings in html tags were not successful. Even though the program ran fine in a serial form, from the command line, it wouldn't run to completion under Hadoop. Instead a simpler method of locating the position of the tags and splicing the string between the indexes was used. (Debugging parallel program has always been and still is a challenging task).
The potential problem with running our MapReduce program on the large xml files is that when they are spliced into several splits, the splitter part of MapReduce will not know that there are logical breaks between pages, delimited by <xml> and </xml>. Very likely, the first split will contain the first half of a wiki page, and the second split will contain the other half. If the split cuts the file between the <categories> and </categories> tags, then we lose some categories in the process...
Mapper
hadoop@hadoop1:~/352/dft/hw4$ cat mapper_cat.py
#!/usr/bin/env python
# mapper_cat.py
# D. Thiebaut
#
# parses a wiki page that is in xml format
# and included in <xml> ... </xml> tags.
# Example:
#
#<xml>
#<title>Breydon Water</title>
#<id>100001</id>
#<contributors>
#<contrib>
#<ip>89.240.43.102</ip>
#
#<length>3119</length></contrib>
#</contributors>
#<categories>
#<cat>Estuaries in England</cat>
#<cat>Ramsar sites in England</cat>
#<cat>Royal Society for the Protection of Birds reserves in England</cat>
#<cat>Norfolk Broads</cat>
#<cat>Sites of Special Scientific Interest in Norfolk</cat>
#</categories>
#<pagelinks>
#<page>Arthur Henry Patterson</page>
#<page>Arthur Ransome</page>
#<page>Avocets</page>
#<page>Bewick's Swan</page>
#</pagelinks>
#<text>
#Image. Breydon-north.jpg thumb 250px ... Broads . Category. Sites of Special Scientific Interest in Norfolk . . . .
#</text>
#</xml>
#
# the program outputs the categories
#
# cat 10000001.xml | ./mapper.py
#
#
# 1976 films 10000001
# Argentine films 10000001
# films 4 10000001
# category 3 10000001
# argentine 3 10000001
# 1976 3 10000001
# spanish 2 10000001
# language 2 10000001
# spanishlanguage 1 10000001
# que 1 10000001
# list 1 10000001
#
# TYPICAL RUN AND OUTPUT OF HADOOP JOB:
#
# hadoop jar /home/hadoop/hadoop/contrib/streaming/hadoop-0.19.2-streaming.jar \
# > -inputreader "StreamXmlRecordReader,begin=<xml>,end=</xml>" \
# > -file ./mapper_cat.py -mapper ./mapper_cat.py \
# > -file ./reducer_cat.py -reducer ./reducer_cat.py \
# > -input wikipages/few -output dft-xml-output
#
# category:(10th century in ireland) [10130001]
# category:(1976 films) [10000001]
# category:(argentine films) [10000001]
# category:(economy of côte d'ivoire) [10160001]
# category:(estuaries in england) [100001,200002]
# category:(hospitals in north carolina) [1010001]
# category:(lists of hospitals in the united states) [1010001]
# category:(medieval ireland) [10130001]
# category:(norfolk broads) [200002,100001]
# category:(north carolina-related lists) [1010001]
# category:(ramsar sites in england) [100001,200002]
# category:(royal society for the protection of birds reserves in england) [200002,100001]
# category:(schizophrenia) [10240001]
# category:(sites of special scientific interest in norfolk) [100001,200002]
# category:(spanish-language films) [10000001]
# category:(symptoms) [10240001]
# category:(tourism by country) [10160001]
#
import sys
def grab( tag, xml ):
"""grabs the text between <tag> and </tag> in the xml text"""
try:
index1 = xml.find( '<%s>' % tag )
index2 = xml.find( '</%s>' % tag, index1 )
return xml[ index1+len( '<%s>' % tag ): index2 ]
except:
return ""
def grabAll( tag, xml ):
"""grabs all the strings between <tag> and </tag> in xml, and
returns them in a list"""
index2 = 0
list = []
while True:
index1 = xml.find( '<%s>' % tag, index2 )
if index1==-1: break
index2 = xml.find( '</%s>' % tag, index1 )
if index2==-1: break
list.append( xml[ index1+len( '<%s>' % tag): index2 ] )
return list
def processInput( debug=False ):
#--- accumulate input lines from stdin ---
xmlText = ""
for line in sys.stdin:
xmlText += line.lower()
#--- grag the Id, and the categories ---
id = grab( "id", xmlText )
categoryList = grabAll( "cat", xmlText )
return ( id, categoryList )
def main( debug=False ):
#--- get page Id, categories (if user wants them), and text ---
id, categoryList = processInput( debug )
for cat in categoryList:
print "%s\t%s" % ( cat, id )
#--- debugging information ---
if debug:
print "-"*60
print "DEBUG"
print "-"*60
print "id = ", id
for cat in categoryList:
print "cat:", cat
print "-"*60
main( False )
Reducer
#!/usr/bin/env python
import sys
#--- get ready to read category, Id pairs ---
lastCategory = None
listOfIds = []
#--- input comes from STDIN ---
for line in sys.stdin:
#--- remove leading and trailing whitespace ---
line = line.strip()
category, Id = line.split('\t', 1)
#--- accumulate Ids for the same category ---
if category==lastCategory:
listOfIds.append( Id )
else:
if lastCategory is not None:
print "category:(%s)\t[%s]" % ( lastCategory, ','.join( listOfIds ) )
lastCategory = category
listOfIds = [ Id ]
#--- write last category processed to stdout ---
if len( listOfIds )!= 0:
print 'category:(%s)\t[%s]' % ( lastCategory, ','.join( listOfIds ) )
Java Solution
Here's a java solution provided by Yang and Xiaoting.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hadoop.hadoop;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import java.util.regex.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files, breaks each line into words
* and reports the number of unique categories and the number of times.
*
* To run: bin/hadoop jar build/hadoop-examples.jar catcount
* [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
*/
public class CatCount extends Configured implements Tool {
/**
* define my own counters
*/
enum MyCounters {
MAPFUNCTIONCALLS,
REDUCEFUNCTIONCALLS,
BUCK
}
/**
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
* (<b>word</b>, <b>1</b>).
*
*
*/
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
// increment task counter
reporter.incrCounter( MyCounters.MAPFUNCTIONCALLS, 1 );
// use regex to match category name
Pattern p = Pattern.compile("<cat>([\\w\\s]*)</cat>");
Matcher m = p.matcher(value.toString());
while (m.find()){
word.set(m.group(1));
output.collect(word,one);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
// increment reduce counter
reporter.incrCounter( MyCounters.REDUCEFUNCTIONCALLS, 1 );
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
static int printUsage() {
System.out.println("catcount [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
/**
* The main driver for word count map/reduce program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), CatCount.class);
conf.setJobName("catcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CatCount(), args);
System.exit(res);
}
}
Problem #3: Unique Words
Below is a pair of Map/Reduce programs for counting unique words in xml blocks.
Note that attempts to use regular expressions to grab strings in html tags were not successful. Instead a simpler method of locating the position of the tags and splicing the string between the indexes was used.
Mapper
#!/usr/bin/env python
# mapper_word.py
# D. Thiebaut
#
# parses a wiki page that is in xml format
# and included in <xml> ... </xml> tags.
# Example:
#
#<xml>
#<title>Breydon Water</title>
#<id>100001</id>
#<contributors>
#<contrib>
#<ip>89.240.43.102</ip>
#
#<length>3119</length></contrib>
#</contributors>
#<categories>
#<cat>Estuaries in England</cat>
#<cat>Ramsar sites in England</cat>
#<cat>Royal Society for the Protection of Birds reserves in England</cat>
#<cat>Norfolk Broads</cat>
#<cat>Sites of Special Scientific Interest in Norfolk</cat>
#</categories>
#<pagelinks>
#<page>Arthur Henry Patterson</page>
#<page>Arthur Ransome</page>
#<page>Avocets</page>
#<page>Bewick's Swan</page>
#</pagelinks>
#<text>
#Image. Breydon-north.jpg thumb 250px ... Broads . Category. Sites of Special Scientific Interest in Norfolk . . . .
#</text>
#</xml>
#
# the program outputs the five most frequent words.
#
# ivoire 2
#aquatic 5
# estuary 10
# photo 1
# rob 4
# years 1
import sys
import string
stopwords = {'all': 1, "she'll": 1, "don't": 1, 'being': 1, 'over': 1, 'through': 1, 'yourselves': 1, 'its': 1,
'before': 1, "he's": 1, "when's": 1, "we've": 1, 'had': 1, 'should': 1, "he'd": 1, 'to': 1,
'only': 1, "there's": 1, 'those': 1, 'under': 1, 'has': 1, "haven't": 1, 'do': 1, 'them': 1,
'his': 1, "they'll": 1, 'very': 1, "who's": 1, "they'd": 1, 'cannot': 1, "you've": 1, 'they': 1,
'not': 1, 'during': 1, 'yourself': 1, 'him': 1, 'nor': 1, "we'll": 1, 'did': 1, "they've": 1,
'this': 1, 'she': 1, 'each': 1, "won't": 1, 'where': 1, "mustn't": 1, "isn't": 1, "i'll": 1,
"why's": 1, 'because': 1, "you'd": 1, 'doing': 1, 'some': 1, 'up': 1, 'are': 1, 'further': 1,
'out': 1, 'what': 1, 'for': 1, 'while': 1, "wasn't": 1, 'does': 1, "shouldn't": 1, 'above': 1,
'between': 1, 'be': 1, 'we': 1, 'who': 1, "you're": 1, 'were': 1, 'here': 1, 'hers': 1, "aren't": 1,
'by': 1, 'both': 1, 'about': 1, 'would': 1, 'of': 1, 'could': 1, 'against': 1, "i'd": 1,
"weren't": 1, "i'm": 1, 'or': 1, "can't": 1, 'own': 1, 'into': 1, 'whom': 1, 'down': 1, "hadn't": 1,
"couldn't": 1, 'your': 1, "doesn't": 1, 'from': 1, "how's": 1, 'her': 1, 'their': 1, "it's": 1,
'there': 1, 'been': 1, 'why': 1, 'few': 1, 'too': 1, 'themselves': 1, 'was': 1, 'until': 1,
'more': 1, 'himself': 1, "where's": 1, "i've": 1, 'with': 1, "didn't": 1, "what's": 1, 'but': 1,
'herself': 1, 'than': 1, "here's": 1, 'he': 1, 'me': 1, "they're": 1, 'myself': 1, 'these': 1,
"hasn't": 1, 'below': 1, 'ought': 1, 'theirs': 1, 'my': 1, "wouldn't": 1, "we'd": 1, 'and': 1,
'then': 1, 'is': 1, 'am': 1, 'it': 1, 'an': 1, 'as': 1, 'itself': 1, 'at': 1, 'have': 1, 'in': 1,
'any': 1, 'if': 1, 'again': 1, 'no': 1, 'that': 1, 'when': 1, 'same': 1, 'how': 1, 'other': 1,
'which': 1, 'you': 1, "shan't": 1, ' ourselves': 1, 'our': 1, 'after': 1, "let's": 1, 'most': 1,
'ours ': 1, 'such': 1, 'on': 1, "he'll": 1, 'a': 1, 'off': 1, 'i': 1, "she'd": 1, 'yours': 1,
"you'll": 1, 'so': 1, "we're": 1, "she's": 1, 'the': 1, "that's": 1, 'having': 1, 'once': 1, 's':1}
def grab( tag, xml ):
"""grabs string between <tag> and </tag> in xml string"""
try:
index1 = xml.find( '<%s>' % tag )
index2 = xml.find( '</%s>' % tag )
return xml[ index1+len( '<%s>' % tag ): index2 ]
except:
return ""
def processInput( debug=False ):
#--- accumulate input lines from stdin ---
xmlText = ""
for line in sys.stdin:
xmlText += line.lower()
#--- grab text and Id from xml input ---
text = grab( "text", xmlText )
id = grab( "id", xmlText )
return ( id, text )
def getMostFreq( text, debug=False ):
"""computes 5 most frequent words"""
global stopwords
#--- remove punctuation (except quote) ---
table = string.maketrans( "", "" )
punctuation = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'
text = text.translate( table, punctuation )
words = text.lower().split()
wordDico = {}
for word in words:
#--- skip stop-words ---
if stopwords.has_key( word ):
continue
#--- accumulate count ---
if wordDico.has_key( word ):
wordDico[ word ] += 1
else:
wordDico[ word ] = 1
L = [ (freq,word) for (word,freq) in wordDico.items() ]
L.sort()
L = L[-10:]
L.reverse()
return L
def main( debug=False ):
#--- get Id and text from stdin ---
id, text = processInput( debug )
#--- output most frquent words, if requested for ---
mostFreqWords = getMostFreq( text, debug )
for freq, word in mostFreqWords:
print "%s\t%s" % ( word, freq )
#--- debugging information ---
if debug:
print "-"*60
print "DEBUG"
print "-"*60
print "id = ", id
print "text = ", text[0:20],"...", text[-20:]
print "mostFreqWords = ", mostFreqWords
print "-"*60
main( False )
Reducer
#!/usr/bin/env python
# reducer_word.py
#
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
continue
try:
word2count[word] = word2count[word]+count
except:
word2count[word] = count
# write the results to STDOUT (standard output)
for word in word2count.keys():
print '%s\t%s'% ( word, word2count[word] )
Slightly Different solution
This solution outputs tuples of the form:
... refer 1:1020001 breydon 11:200002,11:100001 water 8:200002,8:100001 que 1:10000001 article 2:10080001 ...
where the first part is a word, and the second part is a coma-separated list of count:Id, where count is the number of times the word appears in the <text></text> part of the Wiki-page with the given Id.
Mapper.py
hadoop@hadoop1:~/352/dft/hw4$ cat mapper_word2.py
#!/usr/bin/env python
# mapper_word2.py
# D. Thiebaut
#
import sys
import string
stopwords = {'all': 1, "she'll": 1, "don't": 1, 'being': 1, 'over': 1, 'through': 1, 'yourselves': 1, 'its': 1,
'before': 1, "he's": 1, "when's": 1, "we've": 1, 'had': 1, 'should': 1, "he'd": 1, 'to': 1,
'only': 1, "there's": 1, 'those': 1, 'under': 1, 'has': 1, "haven't": 1, 'do': 1, 'them': 1,
'his': 1, "they'll": 1, 'very': 1, "who's": 1, "they'd": 1, 'cannot': 1, "you've": 1, 'they': 1,
'not': 1, 'during': 1, 'yourself': 1, 'him': 1, 'nor': 1, "we'll": 1, 'did': 1, "they've": 1,
'this': 1, 'she': 1, 'each': 1, "won't": 1, 'where': 1, "mustn't": 1, "isn't": 1, "i'll": 1,
"why's": 1, 'because': 1, "you'd": 1, 'doing': 1, 'some': 1, 'up': 1, 'are': 1, 'further': 1,
'out': 1, 'what': 1, 'for': 1, 'while': 1, "wasn't": 1, 'does': 1, "shouldn't": 1, 'above': 1,
'between': 1, 'be': 1, 'we': 1, 'who': 1, "you're": 1, 'were': 1, 'here': 1, 'hers': 1, "aren't": 1,
'by': 1, 'both': 1, 'about': 1, 'would': 1, 'of': 1, 'could': 1, 'against': 1, "i'd": 1,
"weren't": 1, "i'm": 1, 'or': 1, "can't": 1, 'own': 1, 'into': 1, 'whom': 1, 'down': 1, "hadn't": 1,
"couldn't": 1, 'your': 1, "doesn't": 1, 'from': 1, "how's": 1, 'her': 1, 'their': 1, "it's": 1,
'there': 1, 'been': 1, 'why': 1, 'few': 1, 'too': 1, 'themselves': 1, 'was': 1, 'until': 1,
'more': 1, 'himself': 1, "where's": 1, "i've": 1, 'with': 1, "didn't": 1, "what's": 1, 'but': 1,
'herself': 1, 'than': 1, "here's": 1, 'he': 1, 'me': 1, "they're": 1, 'myself': 1, 'these': 1,
"hasn't": 1, 'below': 1, 'ought': 1, 'theirs': 1, 'my': 1, "wouldn't": 1, "we'd": 1, 'and': 1,
'then': 1, 'is': 1, 'am': 1, 'it': 1, 'an': 1, 'as': 1, 'itself': 1, 'at': 1, 'have': 1, 'in': 1,
'any': 1, 'if': 1, 'again': 1, 'no': 1, 'that': 1, 'when': 1, 'same': 1, 'how': 1, 'other': 1,
'which': 1, 'you': 1, "shan't": 1, ' ourselves': 1, 'our': 1, 'after': 1, "let's": 1, 'most': 1,
'ours ': 1, 'such': 1, 'on': 1, "he'll": 1, 'a': 1, 'off': 1, 'i': 1, "she'd": 1, 'yours': 1,
"you'll": 1, 'so': 1, "we're": 1, "she's": 1, 'the': 1, "that's": 1, 'having': 1, 'once': 1, 's':1}
def grab( tag, xml ):
try:
index1 = xml.find( '<%s>' % tag )
index2 = xml.find( '</%s>' % tag )
return xml[ index1+len( '<%s>' % tag ): index2 ]
except:
return ""
def processInput( debug=False ):
#--- accumulate input lines from stdin ---
xmlText = ""
for line in sys.stdin:
xmlText += line.lower()
text = grab( "text", xmlText )
id = grab( "id", xmlText )
return ( id, text )
def getMostFreq( text, debug=False ):
global stopwords
table = string.maketrans( "", "" )
punctuation = '!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'
text = text.translate( table, punctuation )
words = text.lower().split()
wordDico = {}
for word in words:
if stopwords.has_key( word ):
continue
if wordDico.has_key( word ):
wordDico[ word ] += 1
else:
wordDico[ word ] = 1
L = [ (freq,word) for (word,freq) in wordDico.items() ]
L.sort()
L = L[-10:]
L.reverse()
return L
def main( debug=False ):
#--- get arguments from command line ---
Id, text = processInput( debug )
#--- output most frquent words, if requested for ---
mostFreqWords = getMostFreq( text, debug )
for freq, word in mostFreqWords:
print "%s\t%s %s" % ( word, freq, Id )
#--- debugging information ---
if debug:
print "-"*60
print "DEBUG"
print "-"*60
print "id = ", id
print "text = ", text[0:20],"...", text[-20:]
print "mostFreqWords = ", mostFreqWords
print "-"*60
main( False )
Reducer.py
#!/usr/bin/env python
# reduce_word2.py
#
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, pair = line.split('\t', 1)
count, id = pair.split()
try:
word2count[word].append( "%s:%s" % (count,id) )
except:
word2count[word] = [ "%s:%s" % (count,id) ]
# write the results to STDOUT (standard output)
for word in word2count.keys():
print '%s\t%s'% ( word, ",".join( word2count[word] ) )