Hadoop Tutorial 1 -- Running WordCount

From dftwiki3
Revision as of 14:44, 16 March 2010 by Thiebaut (talk | contribs) (Moment of Truth: Compare 5-PC Hadoop cluster to 1 Linux PC)
Jump to: navigation, search

This tutorial will introduce you to the Hadoop Cluster in the Computer Science Dept. at Smith College, and how to submit jobs on it.

The setup of the cloud cluster is fully documented here.

Setup

Cluster Ips

  • You can get the up-to-date Ips of the different nodes here. These Ip should most likely remain in effect for long periods of time:
131.229.101.218     hadoop1
131.229.99.216 hadoop2
131.229.103.202 hadoop3
131.229.100.208 hadoop4
131.229.103.204 hadoop5

Verifying Cluster Operation

  • Ssh to hadoop1
  • Probe for hadoop processes/daemons running on hadoop1 with the Java Virtual Machine Process Status Tool (jps):
hadoop@hadoop1:~$ jps

16404 NameNode
16775 Jps
16576 SecondaryNameNode
16648 JobTracker
  • If you don't see any of the processes above, the cluster could be down. In this case, bring it up with start-all.sh
 hadoop@hadoop1:~$  start-all.sh 
 starting namenode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-namenode-hadoop1.out
 131.229.100.208: starting datanode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-datanode-hadoop4.out
 131.229.99.216: starting datanode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-datanode-hadoop2.out
 131.229.103.202: starting datanode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-datanode-hadoop3.out
 131.229.103.204: starting datanode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-datanode-hadoop5.out
 131.229.101.218: starting secondarynamenode, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-secondarynamenode-hadoop1.out
 starting jobtracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-jobtracker-hadoop1.out
 131.229.100.208: starting tasktracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-tasktracker-hadoop4.out
 131.229.103.202: starting tasktracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-tasktracker-hadoop3.out
 131.229.99.216: starting tasktracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-tasktracker-hadoop2.out
 131.229.103.204: starting tasktracker, logging to /home/hadoop/hadoop/bin/../logs/hadoop-hadoop-tasktracker-hadoop5.out
 hadoop@hadoop1:~$
  • For completeness, you should know that the command for taking the cluster down in stop-all.sh, but, very likely, you will never have to use it.
  • Just to make sure, connect to hadoop2 to verify that it, too, is running some hadoop processes:
hadoop@hadoop1:~$ hadoop2

hadoop@hadoop2:~$ jps
18571 TaskTracker
18749 Jps
18447 DataNode

Running the Map-Reduce WordCount Program

JamesJoyce.jpg

Creating a working directory for your data

  • If you haven't done so, ssh to hadoop1 and create a directory for yourself. We'll use dft as an example in this tutorial.
hadoop@hadoop1:~$ cd 
hadoop@hadoop1:~$ mkdir dft          (replace xx by your two-letter Id)
hadoop@hadoop1:~$ cd dft
hadoop@hadoop1:~/dft$

Creating/Downloading Data Locally

In order to process a text file with hadoop, you first need to download the file to a personal directory on hadoop1, then copy it to the Hadoop File System (HDFS) so that the hadoop namenode and datanodes can share it.

Creating a local copy on Hadoop1

  • Download a copy of James Joyce's Ulysses:
hadoop@hadoop1:~/dft$ wget http://www.gutenberg.org/files/4300/4300.zip
hadoop@hadoop1:~/dft$ unzip 4300.zip
hadoop@hadoop1:~/dft$ rm 4300.zip
hadoop@hadoop1:~/dft$ cat 4300.txt | head -50
Verify that


"Stately, plump Buck Mulligan came from the stairhead, bearing a bowl of lather on which a mirror and a razor lay crossed."




ComputerLogo.png
Lab Experiment #1
Create your own personal directory on hadoop1, and store in it your own version of Ulysses.



Copy Data File to HDFS

  • Copy the data file 4300.txt to the Hadoop File System (HDFS):
hadoop@hadoop1:~/dft$ hadoop dfs -copyFromLocal /home/hadoop/dft dft 
hadoop@hadoop1:~/dft$ hadoop dfs -ls

Found x items
...
drwxr-xr-x   - hadoop supergroup          0 2010-03-16 11:36 /user/hadoop/dft
...
Verify that your directory is now in the Hadoop File System, as indicated above.
  • Check the contents of your directory:
hadoop@hadoop1:~/dft$ hadoop dfs -ls dft

Found 1 items
-rw-r--r--   2 hadoop supergroup    1573044 2010-03-16 11:36 /user/hadoop/dft/4300.txt
Verify that the file 4300.txt is there.

WordCount.java Map-Reduce Program

  • Hadoop comes with a set of demonstration programs. They are located in ~/hadoop/src/examples/org/apache/hadoop/examples/. One of them is WordCount.java which will automatically compute the word frequency of all text files found in the HDFS directory you ask it to process.
  • The program has several sections:

The map section

  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, 
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }

The Map class takes lines of text that are fed to it (the text files are automatically broken down into lines by Hadoop--No need for us to do it!), and breaks them into words. Outputs a datagram for each word that is a ( String, int ) tuple, of the form ( "some-word", 1), since each tuple corresponds to the first occurence of each word, so the initial frequency for each word is 1.

The reduce section

 public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, 
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }

The reduce section gets collections of datagrams of the form [( word, n1 ), (word, n2)...] where all the words are the same, but with different numbers. These collections are the result of a sorting process that is integral to Hadoop and which gathers all the datagrams with the same word together. The reduce process gathers the datagrams inside a datanode, and also gathers datagrams from the different datanodes into a final collection of datagrams where all the words are now unique, with their total frequency (number of occurrences).

The map-reduce organization section

    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);

Here we see that the combining stage and the reduce stage are implemented by the same reduce class, which makes sense, since the number of occurrences of a word as generated on several datanodes is just the sum of the numbers of occurrences.

The datagram definitions

    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);

As the documentation indicates, the datagrams are of the form (String, int).

Running WordCound

Run the wordcount java program from the example directory in hadoop:

hadoop@hadoop1:~/dft$ hadoop jar /home/hadoop/hadoop/hadoop-0.19.2-examples.jar wordcount dft dft-output

The program takes about 21 seconds to execute on a 5-PC cluster. The output generated is something like this:

10/03/16 11:40:51 INFO mapred.FileInputFormat: Total input paths to process : 1
10/03/16 11:40:51 INFO mapred.JobClient: Running job: job_201003161102_0002
10/03/16 11:40:52 INFO mapred.JobClient:  map 0% reduce 0%
10/03/16 11:40:55 INFO mapred.JobClient:  map 9% reduce 0%
10/03/16 11:40:56 INFO mapred.JobClient:  map 27% reduce 0%
10/03/16 11:40:58 INFO mapred.JobClient:  map 45% reduce 0%
10/03/16 11:40:59 INFO mapred.JobClient:  map 81% reduce 0%
10/03/16 11:41:01 INFO mapred.JobClient:  map 100% reduce 0%
10/03/16 11:41:09 INFO mapred.JobClient: Job complete: job_201003161102_0002
10/03/16 11:41:09 INFO mapred.JobClient: Counters: 17
10/03/16 11:41:09 INFO mapred.JobClient:   File Systems
10/03/16 11:41:09 INFO mapred.JobClient:     HDFS bytes read=1576605
10/03/16 11:41:09 INFO mapred.JobClient:     HDFS bytes written=527522
10/03/16 11:41:09 INFO mapred.JobClient:     Local bytes read=1219522
10/03/16 11:41:09 INFO mapred.JobClient:     Local bytes written=2439412
10/03/16 11:41:09 INFO mapred.JobClient:   Job Counters 
10/03/16 11:41:09 INFO mapred.JobClient:     Launched reduce tasks=1
10/03/16 11:41:09 INFO mapred.JobClient:     Rack-local map tasks=6
10/03/16 11:41:09 INFO mapred.JobClient:     Launched map tasks=11
10/03/16 11:41:09 INFO mapred.JobClient:     Data-local map tasks=5
10/03/16 11:41:09 INFO mapred.JobClient:   Map-Reduce Framework
10/03/16 11:41:09 INFO mapred.JobClient:     Reduce input groups=50091
10/03/16 11:41:09 INFO mapred.JobClient:     Combine output records=88551
10/03/16 11:41:09 INFO mapred.JobClient:     Map input records=33055
10/03/16 11:41:09 INFO mapred.JobClient:     Reduce output records=50091
10/03/16 11:41:09 INFO mapred.JobClient:     Map output bytes=2601773
10/03/16 11:41:09 INFO mapred.JobClient:     Map input bytes=1573044
10/03/16 11:41:09 INFO mapred.JobClient:     Combine input records=267975
10/03/16 11:41:09 INFO mapred.JobClient:     Map output records=267975
10/03/16 11:41:09 INFO mapred.JobClient:     Reduce input records=88551

Getting the Output

  • Let's take a look at the output of the program:
hadoop@hadoop1:~/dft$ hadoop dfs -ls

Found x items
drwxr-xr-x   - hadoop supergroup          0 2010-03-16 11:36 /user/hadoop/dft
drwxr-xr-x   - hadoop supergroup          0 2010-03-16 11:41 /user/hadoop/dft-output
Verify that a new directory with -output at the end of your identifier has been created.
  • Look at the contents of this output directory:
hadoop@hadoop1:~/dft$ hadoop dfs -ls dft-output

Found 2 items
drwxr-xr-x   - hadoop supergroup          0 2010-03-16 11:40 /user/hadoop/dft-output/_logs
-rw-r--r--   2 hadoop supergroup     527522 2010-03-16 11:41 /user/hadoop/dft-output/part-00000
  • Finally, let's take a look at the output
hadoop@hadoop1:~/dft$ hadoop dfs -cat dft-output/part-00000 | less 

And we get
"Come   1
"Defects,"      1
"I      1
"Information    1
"J"     1
"Plain  2
"Project        5
.
.
.
zest.   1
zigzag  2
zigzagging      1
zigzags,        1
zivio,  1
zmellz  1
zodiac  1
zodiac. 1
zodiacal        2
zoe)_   1
zones:  1
zoo.    1
zoological      1
zouave's        1
zrads,  2
zrads.  1
  • If we wanted to copy the output file to our local storage (remember, the output is automatically created in the HDFS world, and we have to copy the data from there to our file system to work on it):
 hadoop@hadoop1:~$ cd ~/dft
 hadoop@hadoop1:~/dft$ hadoop dfs -copyToLocal dft-output/part-00000  .
 hadoop@hadoop1:~/dft$ ls

 4300.txt  part-00000
  • To remove the output directory (recursively going through directories if necessary):
hadoop@hadoop1:~$ hadoop dfs -rmr dft-output
Note that it the hadoop program WordCount will not run another time if the output directory exists. It always wants to create a new one, so we'll have to remove the output directory regularly after having saved the output of each job.




ComputerLogo.png
Lab Experiment #2
Run WordCount.java on Ulysses from your own HDFS directory



Moment of Truth: Compare 5-PC Hadoop cluster to 1 Linux PC

  • The moment of truth has arrived. How is Hadoop fairing against a regular PC running Linux and computing the word frequencies of the contents of Ulysses?
  • Step 1: time the execution of WordCount.java on hadoop.
 hadoop dfs -rmr dft-output
 time hadoop jar /home/hadoop/hadoop/hadoop-0.19.2-examples.jar wordcount dft dft-output
  • Observe and record the total execution time (real)
  • To compute the word frequency of a text with Linux, we can use Linux commands and pipes, as follows:
tr ' ' ' [ret]
' < 4300.txt | sort | uniq -c [ret]
where [ret] indicates that you should press the return/enter key. The explanation of what is going on is nicely presented at http://dsl.org, in the Text Concordance recipe.
  • Try the command and verify that you get the word frequency of 4300.txt:
  2457 
     1 _
     1 _........................
     7 -
     3 --
     2 --_...
     5 --...
     1 --............
     3 --
     1 ?...
     6 ...
     1 ...?

   .  .  .

     1 Zouave
     1 zouave's
     2 zrads,
     1 zrads.
     1 Zrads
     1 Zulu
     1 Zulus
     1 _Zut!
  • Observe the real execution time.



ComputerLogo.png
Lab Experiment #3
Measure the execution time of Hadoop versus Linux (use hadoop1) for computing the word frequency of Ulysses. Comment on your results.



ArthurThomson.jpgLeonardo.jpgSunzi.jpgConanDoyle.jpg100pxEncyclopediaBritanica.jpg

ComputerLogo.png
Lab Experiment #4

Instead of map-reducing just one text file, make the cluster work on several books at once. Michael Noll points to several downloadable book in his tutorial on setting up Hadoop on an Ubuntu cluster:

  • The Outline of Science, Vol. 1 (of 4) by J. Arthur Thomson (download)
  • The Notebooks of Leonardo Da Vinci (download)
  • The Art of War by 6th cent. B.C. Sunzi (download)
  • The Adventures of Sherlock Holmes by Sir Arthur Conan Doyle (download)
  • The Devil's Dictionary by Ambrose Bierce (download)
  • Encyclopaedia Britannica, 11th Edition, Volume 4, Part 3 (download)

Download them all and time the execution time of hadoop on all these books against a single Linux PC.