Tutorial: Creating a Hadoop Cluster with StarCluster on Amazon AWS (deprecated)

From dftwiki3
Jump to: navigation, search

--D. Thiebaut (talk) 09:40, 6 November 2013 (EST)
Updated --D. Thiebaut (talk) 13:47, 16 March 2017 (EDT)


AmazonAWS.jpgHadoopCartoon.png

This tutorial supersedes the previous two tutorials on this subject. It assumes that you have installed MIT's starcluster (see this tutorial for background material) on your local machine, and have obtained proper credentials to access the Amazon Web Services (AWS). This tutorial is also used to set up a Hadoop cluster in the CSC352 class on distributed processing taught Fall 2013 at Smith College. Some information is pertinent to the students taking the class, and may not necessarily match other users setup.







Setup


  • If you have already installed starcluster on your local machine, and have already tested it on AWS, creating a hadoop cluster takes no time!
  • If you do not have starcluster on your laptop, please do this:
    • Follow the directions in this tutorial to setup Starcluster on your local machine.
    • Follow the directions presented in MIT's | Starcluster documentation to add the hadoop plugin to the starcluster on your local machine.

This section is only visible to computers located at Smith College

  • Note that you should have created a new cluster definition in your starcluster config file for your hadoop cluster. You may want to set the default cluster definition to the new cluster. These are the lines in config that implement this:
DEFAULT_TEMPLAGE=hadoopcluster

[cluster hadoopcluster]
plugins = hadoop
KEYNAME = mykeyABC       
CLUSTER_SIZE = 1
CLUSTER_USER = sgeadmin
CLUSTER_SHELL = bash
NODE_IMAGE_ID = ami-7c5c3915
NODE_INSTANCE_TYPE = m1.medium
AVAILABILITY_ZONE = us-east-1c
VOLUMES = enwiki, dataABC       
#SPOT_BID = 0.50


Note 1: Your EBS volumes may be different from the ones shown above. All we assume here is that one of the volumes will be NFS attached to the /data directory of all the nodes of our cluster. Currently we are using only a cluster of 1 node for debuggin our MapReduce applications.
Note 2: It is recommend to start with just 1 node in the cluster. Hadoop will work fine with one node, and it is less expensive to develop and test on 1 node first and deploy on a larger number once the application is fully debugged.


Starting the 1-Node Hadoop Cluster


Simply start the cluster using starcluster in a Terminal window of your laptop:

starcluster start myhadoopcluster
StarCluster - (http://star.mit.edu/cluster) (v. 0.9999)
Software Tools for Academics and Researchers (STAR)
Please submit bug reports to starcluster@mit.edu

>>> Using default cluster template: hadoopcluster
>>> Validating cluster template settings...
>>> Cluster template settings are valid
>>> Starting cluster...
>>> Launching a 1-node cluster...
>>> Creating security group @sc-myhadoopcluster...
Reservation:r-xxxxxxxxx
>>> Waiting for cluster to come up... (updating every 30s)
>>> Waiting for all nodes to be in a 'running' state...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
...
>>> Running plugin hadoop
>>> Configuring Hadoop...
>>> Adding user sgeadmin to hadoop group
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Installing configuration templates...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring environment...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring MapReduce Site...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring Core Site...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring HDFS Site...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring masters file...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring slaves file...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring HDFS...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Configuring dumbo...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Starting namenode...
>>> Starting secondary namenode...
>>> Starting datanode on all nodes...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Starting jobtracker...
>>> Starting tasktracker on all nodes...
1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100%  
>>> Job tracker status: http://ec2xxxxxxxxxx.compute-1.amazonaws.com:50030
>>> Namenode status: http://ec2xxxxxxxxxx.compute-1.amazonaws.com:50070
>>> Configuring cluster took 2.588 mins
>>> Starting cluster took 3.793 mins


Note that the information displayed by starcluster shows status lines that show the hadoop environment being started in the form of

  • a MapReduce site,
  • an HDFS site,
  • masters,
  • slaves,
  • namenodes,
  • secondary namenodes,
  • job trackers, and
  • taks trackers.


Connecting to the Hadoop Cluster


  • This is done by ssh-ing from the Terminal window on your laptop to the master node of your AWS cluster. Starcluster understands the concept of a cluster in the world of MPI, where there's always a master node and several worker nodes:
starcluster sshmaster myhadoopcluster
StarCluster - (http://star.mit.edu/cluster) (v. 0.9999)
Software Tools for Academics and Researchers (STAR)
Please submit bug reports to starcluster@mit.edu
...
root@master:~# 
  • We are now connected to the master of our 1-node cluster and ready to enter create and run our first Hadoop program.


Running our First Hadoop Program


The "hello world!" program of the MapReduce environment is a word-count program that takes one or several documents (typically books from the Gutenberg Web site) and computes the frequency of occurrence of each word.

We will run this program two different ways: the first is to use the wordcount program included in the examples folder of the hadoop distribution package. The second is to create the Wordcount.java program ourselves and go through the complete set of compilation and execution steps ourselves.

Running the Canned Wordcount Program


  • The steps required to run a MapReduce program on a set of files are as follows:
    1. create a folder in the Hadoop distributed file system (HDFS) for our data files
    2. upload the data files to the new folder
    3. run the MapReduce java program
    4. download the result file(s) from the HDFS and inspect them.


We now proceed to follow these steps.

Create data files

  • First we download a couple books from the Gutenberg.org project which contains many books on line, copyright free. Unfortunately the Gutenberg site prevents downloads coming from an AWS instance, as it assumes it is coming from a robot, so instead we'll download a couple of books from a local repository:
 root@master:/data/hadoop# wget http://cs.smith.edu/~thiebaut/gutenberg/4300-8.zip
 unzip 4300-8.zip
 root@master:/data/hadoop# wget http://cs.smith.edu/~thiebaut/gutenberg/12241.txt

The first book is Ulysses.', by James Joyce, and the second are Poems by Emily Dickinson, 3rd series.

Put the data files in HDFS

  • run the following commands on the cluster:
root@master:/data/hadoop# hadoop dfs -mkdir books
root@master:/data/hadoop# hadoop dfs -copyFromLocal 4300-8.txt books
root@master:/data/hadoop# hadoop dfs -copyFromLocal 12241.txt books
root@master:/data/hadoop# hadoop dfs -ls books
Found 2 items
-rw-r--r--   3 root supergroup      78956 2013-11-06 16:16 /user/root/books/12241.txt
-rw-r--r--   3 root supergroup    1573082 2013-11-06 16:15 /user/root/books/4300-8.txt
  • Good! Our data files are now in HDFS

Run the Program

  • Next we need to locate the hadoop distribution folder and in particular the example jar file:
root@master:/data/hadoop# find / -name "*hadoop-examples.jar" -print
/usr/lib/hadoop-0.20/hadoop-examples.jar
It's in /usr/lib/hadoop-0.20. We also just learnt that we are using hadoop Version 0.20.
  • Run the Wordcount program from the examples jar:
root@master:/data/hadoop# hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount books output1
13/11/06 16:32:20 INFO input.FileInputFormat: Total input paths to process : 2
13/11/06 16:32:20 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/06 16:32:20 INFO mapred.JobClient: Running job: job_201311061538_0001
13/11/06 16:32:21 INFO mapred.JobClient:  map 0% reduce 0%
13/11/06 16:32:28 INFO mapred.JobClient:  map 50% reduce 0%
13/11/06 16:32:30 INFO mapred.JobClient:  map 100% reduce 0%
13/11/06 16:32:37 INFO mapred.JobClient:  map 100% reduce 33%
13/11/06 16:32:38 INFO mapred.JobClient:  map 100% reduce 100%
13/11/06 16:32:39 INFO mapred.JobClient: Job complete: job_201311061538_0001
... 
13/11/06 16:32:39 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1126993920
13/11/06 16:32:39 INFO mapred.JobClient:     Map output records=280727 
  • That's it! The two files have been processed. Let's take a look at the output file:


Check the Output


The output of the MapReduce application is always in a folder. The typical use for the folder is to download it or the result file inside it to your laptop, take a look at it or shape it for visualization, and delete the folder from the HDFS.

  • Below are steps necessary to look at the output
root@master:/data/hadoop# hadoop dfs -ls
Found 2 items
drwxr-xr-x   - root supergroup          0 2013-11-06 16:16 /user/root/books
drwxr-xr-x   - root supergroup          0 2013-11-06 16:32 /user/root/output1

root@master:/data/hadoop# hadoop dfs -ls output1
Found 3 items
-rw-r--r--   3 root supergroup          0 2013-11-06 16:32 /user/root/output1/_SUCCESS
drwxr-xr-x   - root supergroup          0 2013-11-06 16:32 /user/root/output1/_logs
-rw-r--r--   3 root supergroup     543078 2013-11-06 16:32 /user/root/output1/part-r-00000

  • We copy the output file from HDFS to our /data/hadoop directory:
root@master:/data/hadoop# hadoop dfs -copyToLocal output1/part-r-00000 .
  • We clean up the HDFS of the output directory which we don't need any longer.
root@master:/data/hadoop# hadoop dfs -rmr output1
Deleted hdfs://master:54310/user/root/output1
  • We look at some areas of the output file to verify that it lists words in alphabetical order, followed by a count of the number of times each word is found:
root@master:/data/hadoop# head part-r-00000 
"A	1
"Come	1
"Defects,"	2
"I	2
"Information	2
"J"	1
"Plain	4
"Project	10
"Right	2
"Viator"	1

root@master:/data/hadoop# tail part-r-00000 
zone	1
zones:	1
zoo.	1
zoological	1
zouave's	1
zrads,	2
zrads.	1 
�	7
�lus,_	1
�tat_.	1
  • Looking at the book of poems by Emily Dickinson, we pick the editor's email address as a unique pattern to look for. Indeed, we see that it was picked up a couple times by the program:
root@master:/data/hadoop#  grep -i "jtinsley@pobox.com" part-r-00000  
<jtinsley@pobox.com>	2



Challenge # 1

QuestionMark3.jpg


Go through the same steps, download a couple books (or other documents) from Gutenberg.org, create a 1-node Starcluster MPI cluster with the hadoop plugin, upload your document to hdfs on your cluster, run the Wordcount application, and verify that you get a histogram of the occurrences of words in the documents.



Running our Own Java WordCount Program


  • Running our own Java program as a MapReduce application requires a few more steps than the previous method. We have to
    1. upload the data to HDFS (we'll skip this step since we've done it in the previous section)
    2. create the source file (in our case we'll copy it from the hadoop examples folder),
    3. figure out where the hadoop java library resides
    4. compile the java program and create a directory of java class files
    5. create the jar file
    6. run the program,
    7. download the result file(s)
    8. cleanup up the HDFS


Creating the Java Source


  • We will simply copy paste the code below (taken from Apache) into emacs and create our own copy of WordCount.java. A few lines have been edited (highlighted in yellow) from the original file.
root@master:/data/hadoop# emacs MyWordCount.java -nw


/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//package org.apache.hadoop.examples;
package org.myorg;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * This is an example Hadoop Map/Reduce application.
 * It reads the text input files, breaks each line into words
 * and counts them. The output is a locally sorted list of words and the 
 * count of how often they occurred.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar wordcount
 *            [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i> 
 */
public class MyWordCount extends Configured implements Tool {
  
  /**
   * Counts the words in each line.
   * For each line of input, break the line into words and emit them as
   * (<b>word</b>, <b>1</b>).
   */
  public static class MapClass extends MapReduceBase
    implements Mapper<LongWritable, Text, Text, IntWritable> {
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    
    public void map(LongWritable key, Text value, 
                    OutputCollector<Text, IntWritable> output, 
                    Reporter reporter) throws IOException {
      String line = value.toString();
      StringTokenizer itr = new StringTokenizer(line);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        output.collect(word, one);
      }
    }
  }
  
  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> {
    
    public void reduce(Text key, Iterator<IntWritable> values,
                       OutputCollector<Text, IntWritable> output, 
                       Reporter reporter) throws IOException {
      int sum = 0;
      while (values.hasNext()) {
        sum += values.next().get();
      }
      output.collect(key, new IntWritable(sum));
    }
  }
  
  static int printUsage() {
    System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
    ToolRunner.printGenericCommandUsage(System.out);
    return -1;
  }
  
  /**
   * The main driver for word count map/reduce program.
   * Invoke this method to submit the map/reduce job.
   * @throws IOException When there is communication problems with the 
   *                     job tracker.
   */
  public int run(String[] args) throws Exception {
    JobConf conf = new JobConf(getConf(), MyWordCount.class);
    conf.setJobName("wordcount");
 
    // the keys are words (strings)
    conf.setOutputKeyClass(Text.class);
    // the values are counts (ints)
    conf.setOutputValueClass(IntWritable.class);
    
    conf.setMapperClass(MapClass.class);        
    conf.setCombinerClass(Reduce.class);
    conf.setReducerClass(Reduce.class);
    
    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException except) {
        System.out.println("ERROR: Integer expected instead of " + args[i]);
        return printUsage();
      } catch (ArrayIndexOutOfBoundsException except) {
        System.out.println("ERROR: Required parameter missing from " +
                           args[i-1]);
        return printUsage();
      }
    }
    // Make sure there are exactly 2 parameters left.
    if (other_args.size() != 2) {
      System.out.println("ERROR: Wrong number of parameters: " +
                         other_args.size() + " instead of 2.");
      return printUsage();
    }
    FileInputFormat.setInputPaths(conf, other_args.get(0));
    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        
    JobClient.runJob(conf);
    return 0;
  }
  
  
  public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(), new MyWordCount(), args);
    System.exit(res);
  }

}


Locating the Hadoop Core library

root@master:/data/hadoop# find / -name "hadoop-core.jar" -print
/usr/lib/hadoop-0.20/hadoop-core.jar


Compiling the MapReduce Application, Creating the JAR File


Now that we know where the hadoop library is, we can compile our program against this library:

root@master:/data/hadoop#  mkdir MyWordCount_classes

root@master:/data/hadoop#  javac -classpath /usr/lib/hadoop-0.20/hadoop-core.jar -d MyWordCount_classes \
 MyWordCount.java

root@master:/data/hadoop# jar -cvf MyWordCount.jar -C MyWordCount_classes/ .
added manifest
adding: org/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/(in = 0) (out= 0)(stored 0%)
adding: org/myorg/MyWordCount$MapClass.class(in = 1954) (out= 805)(deflated 58%)
adding: org/myorg/MyWordCount.class(in = 3324) (out= 1679)(deflated 49%)
adding: org/myorg/MyWordCount$Reduce.class(in = 1617) (out= 651)(deflated 59%)

We now have the jar file in our directory:

root@master:/data/hadoop# ls -ltr | tail -1
-rw-r--r-- 1 root root    4130 Nov  6 19:17 MyWordCount.jar


Running the MapReduce Application


root@master:/data/hadoop# hadoop jar /data/hadoop/MyWordCount.jar org.myorg.MyWordCount books output2
13/11/06 19:31:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13/11/06 19:31:13 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/06 19:31:13 INFO mapred.FileInputFormat: Total input paths to process : 2
13/11/06 19:31:13 INFO mapred.JobClient: Running job: job_201311061822_0003
13/11/06 19:31:14 INFO mapred.JobClient:  map 0% reduce 0%
...
13/11/06 19:31:33 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1501794304
13/11/06 19:31:33 INFO mapred.JobClient:     Map output records=280727


Check the Output


We proceed as we did in the previous section:

root@master:/data/hadoop# hadoop dfs -ls
Found 2 items
drwxr-xr-x   - root supergroup          0 2013-11-06 18:26 /user/root/books
drwxr-xr-x   - root supergroup          0 2013-11-06 19:31 /user/root/output2

root@master:/data/hadoop# hadoop dfs -ls output2
Found 3 items
-rw-r--r--   3 root supergroup          0 2013-11-06 19:31 /user/root/output2/_SUCCESS 
drwxr-xr-x   - root supergroup          0 2013-11-06 19:31 /user/root/output2/_logs
-rw-r--r--   3 root supergroup     543078 2013-11-06 19:31 /user/root/output2/part-00000

root@master:/data/hadoop# hadoop dfs -copyToLocal output2/part-00000 .

root@master:/data/hadoop# hadoop dfs -rmr output2
Deleted hdfs://master:54310/user/root/output2

root@master:/data/hadoop# head part-00000 
"A	1
"Come	1
"Defects,"	2
"I	2
"Information	2
"J"	1
"Plain	4
"Project	10
"Right	2
"Viator"	1

root@master:/data/hadoop# tail part-00000 
zone	1
zones:	1
zoo.	1
zoological	1
zouave's	1
zrads,	2
zrads.	1 
�	7
�lus,_	1
�tat_.	1

Terminate The Hadoop Cluster

Exit from the SSH session to the Master node, and terminate the cluster.

root@master:/data/hadoop# exit
logout
Connection to ec2-54-205-166-74.compute-1.amazonaws.com closed.

[localMac] ~/.starcluster$: starcluster terminate myhadoopcluster
StarCluster - (http://star.mit.edu/cluster) (v. 0.9999)
Software Tools for Academics and Researchers (STAR)
Please submit bug reports to starcluster@mit.edu

Terminate EBS cluster myhadoopcluster (y/n)? y
>>> Running plugin starcluster.plugins.hadoop.Hadoop
>>> Running plugin starcluster.plugins.sge.SGEPlugin
>>> Running plugin starcluster.clustersetup.DefaultClusterSetup
>>> Detaching volume vol-ffffffff from master
...
>>> Waiting for cluster to terminate... 
>>> Removing @sc-myhadoopcluster security group



Challenge # 2

QuestionMark1.jpg


A MapReduce applications is essentially the combination of two functions, a map() function, and a reduce() function. Take a look at how these two functions together, with the MapReduce infrastructure of Hadoop, performe the computation of a histogram (which is really what the wordcount program generates). Map() gets single words from the document(s) and generates tuples of the form (word, 1) that it stores in a tuple world. These tuples are automatically sorted by their first field, which is a word from the document, and passed on as blocks of tuples with identical first field to reduce() which counts how many tuples have identical first field and outputs the result in the output file as a line containing the word first, and the count second.

Modify either map() or reduce() so that your word count program outputs only words that start with an uppercase letter. Verify that your program works.

Hints: Hadoop introduces a lot of new types. One is Text, that is Hadoop's version of the Java String. You can go from a Text variable to a String variable using the toString() method.

Text word;
String s;
s = word.toString();

To go from String to Text, you can do this:

Text word;
String s;
word = new Text( s );

Stack-Overflow has a good recipe for testing if the first character of a String is uppercase:

String s;

if ( Character.isUpperCase(s.charAt( 0 ) ) ) {
    ...
}




...