Tutorial: Creating a Hadoop Cluster with StarCluster on Amazon AWS (deprecated)
--D. Thiebaut (talk) 09:40, 6 November 2013 (EST)
Updated --D. Thiebaut (talk) 13:47, 16 March 2017 (EDT)
This tutorial supersedes the previous two tutorials on this subject. It assumes that you have installed MIT's starcluster (see this tutorial for background material) on your local machine, and have obtained proper credentials to access the Amazon Web Services (AWS). This tutorial is also used to set up a Hadoop cluster in the CSC352 class on distributed processing taught Fall 2013 at Smith College. Some information is pertinent to the students taking the class, and may not necessarily match other users setup. |
Setup
- If you have already installed starcluster on your local machine, and have already tested it on AWS, creating a hadoop cluster takes no time!
- If you do not have starcluster on your laptop, please do this:
- Follow the directions in this tutorial to setup Starcluster on your local machine.
- Follow the directions presented in MIT's | Starcluster documentation to add the hadoop plugin to the starcluster on your local machine.
- Note that you should have created a new cluster definition in your starcluster config file for your hadoop cluster. You may want to set the default cluster definition to the new cluster. These are the lines in config that implement this:
DEFAULT_TEMPLAGE=hadoopcluster [cluster hadoopcluster] plugins = hadoop KEYNAME = mykeyABC CLUSTER_SIZE = 1 CLUSTER_USER = sgeadmin CLUSTER_SHELL = bash NODE_IMAGE_ID = ami-7c5c3915 NODE_INSTANCE_TYPE = m1.medium AVAILABILITY_ZONE = us-east-1c VOLUMES = enwiki, dataABC #SPOT_BID = 0.50
Note 1: Your EBS volumes may be different from the ones shown above. All we assume here is that one of the volumes will be NFS attached to the /data directory of all the nodes of our cluster. Currently we are using only a cluster of 1 node for debuggin our MapReduce applications.
Note 2: It is recommend to start with just 1 node in the cluster. Hadoop will work fine with one node, and it is less expensive to develop and test on 1 node first and deploy on a larger number once the application is fully debugged.
Starting the 1-Node Hadoop Cluster
Simply start the cluster using starcluster in a Terminal window of your laptop:
starcluster start myhadoopcluster StarCluster - (http://star.mit.edu/cluster) (v. 0.9999) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu >>> Using default cluster template: hadoopcluster >>> Validating cluster template settings... >>> Cluster template settings are valid >>> Starting cluster... >>> Launching a 1-node cluster... >>> Creating security group @sc-myhadoopcluster... Reservation:r-xxxxxxxxx >>> Waiting for cluster to come up... (updating every 30s) >>> Waiting for all nodes to be in a 'running' state... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% ... >>> Running plugin hadoop >>> Configuring Hadoop... >>> Adding user sgeadmin to hadoop group 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Installing configuration templates... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring environment... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring MapReduce Site... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring Core Site... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring HDFS Site... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring masters file... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring slaves file... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring HDFS... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Configuring dumbo... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Starting namenode... >>> Starting secondary namenode... >>> Starting datanode on all nodes... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Starting jobtracker... >>> Starting tasktracker on all nodes... 1/1 |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 100% >>> Job tracker status: http://ec2xxxxxxxxxx.compute-1.amazonaws.com:50030 >>> Namenode status: http://ec2xxxxxxxxxx.compute-1.amazonaws.com:50070 >>> Configuring cluster took 2.588 mins >>> Starting cluster took 3.793 mins
Note that the information displayed by starcluster shows status lines that show the hadoop environment being started in the form of
- a MapReduce site,
- an HDFS site,
- masters,
- slaves,
- namenodes,
- secondary namenodes,
- job trackers, and
- taks trackers.
Connecting to the Hadoop Cluster
- This is done by ssh-ing from the Terminal window on your laptop to the master node of your AWS cluster. Starcluster understands the concept of a cluster in the world of MPI, where there's always a master node and several worker nodes:
starcluster sshmaster myhadoopcluster StarCluster - (http://star.mit.edu/cluster) (v. 0.9999) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu ... root@master:~#
- We are now connected to the master of our 1-node cluster and ready to enter create and run our first Hadoop program.
Running our First Hadoop Program
The "hello world!" program of the MapReduce environment is a word-count program that takes one or several documents (typically books from the Gutenberg Web site) and computes the frequency of occurrence of each word.
We will run this program two different ways: the first is to use the wordcount program included in the examples folder of the hadoop distribution package. The second is to create the Wordcount.java program ourselves and go through the complete set of compilation and execution steps ourselves.
Running the Canned Wordcount Program
- The steps required to run a MapReduce program on a set of files are as follows:
- create a folder in the Hadoop distributed file system (HDFS) for our data files
- upload the data files to the new folder
- run the MapReduce java program
- download the result file(s) from the HDFS and inspect them.
We now proceed to follow these steps.
Create data files
- First we download a couple books from the Gutenberg.org project which contains many books on line, copyright free. Unfortunately the Gutenberg site prevents downloads coming from an AWS instance, as it assumes it is coming from a robot, so instead we'll download a couple of books from a local repository:
root@master:/data/hadoop# wget http://cs.smith.edu/~thiebaut/gutenberg/4300-8.zip unzip 4300-8.zip root@master:/data/hadoop# wget http://cs.smith.edu/~thiebaut/gutenberg/12241.txt
The first book is Ulysses.', by James Joyce, and the second are Poems by Emily Dickinson, 3rd series.
Put the data files in HDFS
- run the following commands on the cluster:
root@master:/data/hadoop# hadoop dfs -mkdir books root@master:/data/hadoop# hadoop dfs -copyFromLocal 4300-8.txt books root@master:/data/hadoop# hadoop dfs -copyFromLocal 12241.txt books root@master:/data/hadoop# hadoop dfs -ls books Found 2 items -rw-r--r-- 3 root supergroup 78956 2013-11-06 16:16 /user/root/books/12241.txt -rw-r--r-- 3 root supergroup 1573082 2013-11-06 16:15 /user/root/books/4300-8.txt
- Good! Our data files are now in HDFS
Run the Program
- Next we need to locate the hadoop distribution folder and in particular the example jar file:
root@master:/data/hadoop# find / -name "*hadoop-examples.jar" -print /usr/lib/hadoop-0.20/hadoop-examples.jar
- It's in /usr/lib/hadoop-0.20. We also just learnt that we are using hadoop Version 0.20.
- Run the Wordcount program from the examples jar:
root@master:/data/hadoop# hadoop jar /usr/lib/hadoop-0.20/hadoop-examples.jar wordcount books output1 13/11/06 16:32:20 INFO input.FileInputFormat: Total input paths to process : 2 13/11/06 16:32:20 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/06 16:32:20 INFO mapred.JobClient: Running job: job_201311061538_0001 13/11/06 16:32:21 INFO mapred.JobClient: map 0% reduce 0% 13/11/06 16:32:28 INFO mapred.JobClient: map 50% reduce 0% 13/11/06 16:32:30 INFO mapred.JobClient: map 100% reduce 0% 13/11/06 16:32:37 INFO mapred.JobClient: map 100% reduce 33% 13/11/06 16:32:38 INFO mapred.JobClient: map 100% reduce 100% 13/11/06 16:32:39 INFO mapred.JobClient: Job complete: job_201311061538_0001 ... 13/11/06 16:32:39 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1126993920 13/11/06 16:32:39 INFO mapred.JobClient: Map output records=280727
- That's it! The two files have been processed. Let's take a look at the output file:
Check the Output
The output of the MapReduce application is always in a folder. The typical use for the folder is to download it or the result file inside it to your laptop, take a look at it or shape it for visualization, and delete the folder from the HDFS.
- Below are steps necessary to look at the output
root@master:/data/hadoop# hadoop dfs -ls Found 2 items drwxr-xr-x - root supergroup 0 2013-11-06 16:16 /user/root/books drwxr-xr-x - root supergroup 0 2013-11-06 16:32 /user/root/output1 root@master:/data/hadoop# hadoop dfs -ls output1 Found 3 items -rw-r--r-- 3 root supergroup 0 2013-11-06 16:32 /user/root/output1/_SUCCESS drwxr-xr-x - root supergroup 0 2013-11-06 16:32 /user/root/output1/_logs -rw-r--r-- 3 root supergroup 543078 2013-11-06 16:32 /user/root/output1/part-r-00000
- We copy the output file from HDFS to our /data/hadoop directory:
root@master:/data/hadoop# hadoop dfs -copyToLocal output1/part-r-00000 .
- We clean up the HDFS of the output directory which we don't need any longer.
root@master:/data/hadoop# hadoop dfs -rmr output1 Deleted hdfs://master:54310/user/root/output1
- We look at some areas of the output file to verify that it lists words in alphabetical order, followed by a count of the number of times each word is found:
root@master:/data/hadoop# head part-r-00000 "A 1 "Come 1 "Defects," 2 "I 2 "Information 2 "J" 1 "Plain 4 "Project 10 "Right 2 "Viator" 1 root@master:/data/hadoop# tail part-r-00000 zone 1 zones: 1 zoo. 1 zoological 1 zouave's 1 zrads, 2 zrads. 1 � 7 �lus,_ 1 �tat_. 1
- Looking at the book of poems by Emily Dickinson, we pick the editor's email address as a unique pattern to look for. Indeed, we see that it was picked up a couple times by the program:
root@master:/data/hadoop# grep -i "jtinsley@pobox.com" part-r-00000 <jtinsley@pobox.com> 2
Challenge # 1 |
Go through the same steps, download a couple books (or other documents) from Gutenberg.org, create a 1-node Starcluster MPI cluster with the hadoop plugin, upload your document to hdfs on your cluster, run the Wordcount application, and verify that you get a histogram of the occurrences of words in the documents.
Running our Own Java WordCount Program
- Running our own Java program as a MapReduce application requires a few more steps than the previous method. We have to
- upload the data to HDFS (we'll skip this step since we've done it in the previous section)
- create the source file (in our case we'll copy it from the hadoop examples folder),
- figure out where the hadoop java library resides
- compile the java program and create a directory of java class files
- create the jar file
- run the program,
- download the result file(s)
- cleanup up the HDFS
Creating the Java Source
- We will simply copy paste the code below (taken from Apache) into emacs and create our own copy of WordCount.java. A few lines have been edited (highlighted in yellow) from the original file.
root@master:/data/hadoop# emacs MyWordCount.java -nw
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//package org.apache.hadoop.examples;
package org.myorg;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* This is an example Hadoop Map/Reduce application.
* It reads the text input files, breaks each line into words
* and counts them. The output is a locally sorted list of words and the
* count of how often they occurred.
*
* To run: bin/hadoop jar build/hadoop-examples.jar wordcount
* [-m <i>maps</i>] [-r <i>reduces</i>] <i>in-dir</i> <i>out-dir</i>
*/
public class MyWordCount extends Configured implements Tool {
/**
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
* (<b>word</b>, <b>1</b>).
*/
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
/**
* A reducer class that just emits the sum of the input values.
*/
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
static int printUsage() {
System.out.println("wordcount [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
/**
* The main driver for word count map/reduce program.
* Invoke this method to submit the map/reduce job.
* @throws IOException When there is communication problems with the
* job tracker.
*/
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), MyWordCount.class);
conf.setJobName("wordcount");
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
if ("-m".equals(args[i])) {
conf.setNumMapTasks(Integer.parseInt(args[++i]));
} else if ("-r".equals(args[i])) {
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
} else {
other_args.add(args[i]);
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyWordCount(), args);
System.exit(res);
}
}
Locating the Hadoop Core library
root@master:/data/hadoop# find / -name "hadoop-core.jar" -print /usr/lib/hadoop-0.20/hadoop-core.jar
Compiling the MapReduce Application, Creating the JAR File
Now that we know where the hadoop library is, we can compile our program against this library:
root@master:/data/hadoop# mkdir MyWordCount_classes root@master:/data/hadoop# javac -classpath /usr/lib/hadoop-0.20/hadoop-core.jar -d MyWordCount_classes \ MyWordCount.java root@master:/data/hadoop# jar -cvf MyWordCount.jar -C MyWordCount_classes/ . added manifest adding: org/(in = 0) (out= 0)(stored 0%) adding: org/myorg/(in = 0) (out= 0)(stored 0%) adding: org/myorg/MyWordCount$MapClass.class(in = 1954) (out= 805)(deflated 58%) adding: org/myorg/MyWordCount.class(in = 3324) (out= 1679)(deflated 49%) adding: org/myorg/MyWordCount$Reduce.class(in = 1617) (out= 651)(deflated 59%)
We now have the jar file in our directory:
root@master:/data/hadoop# ls -ltr | tail -1 -rw-r--r-- 1 root root 4130 Nov 6 19:17 MyWordCount.jar
Running the MapReduce Application
root@master:/data/hadoop# hadoop jar /data/hadoop/MyWordCount.jar org.myorg.MyWordCount books output2 13/11/06 19:31:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 13/11/06 19:31:13 WARN snappy.LoadSnappy: Snappy native library not loaded 13/11/06 19:31:13 INFO mapred.FileInputFormat: Total input paths to process : 2 13/11/06 19:31:13 INFO mapred.JobClient: Running job: job_201311061822_0003 13/11/06 19:31:14 INFO mapred.JobClient: map 0% reduce 0% ... 13/11/06 19:31:33 INFO mapred.JobClient: Virtual memory (bytes) snapshot=1501794304 13/11/06 19:31:33 INFO mapred.JobClient: Map output records=280727
Check the Output
We proceed as we did in the previous section:
root@master:/data/hadoop# hadoop dfs -ls Found 2 items drwxr-xr-x - root supergroup 0 2013-11-06 18:26 /user/root/books drwxr-xr-x - root supergroup 0 2013-11-06 19:31 /user/root/output2 root@master:/data/hadoop# hadoop dfs -ls output2 Found 3 items -rw-r--r-- 3 root supergroup 0 2013-11-06 19:31 /user/root/output2/_SUCCESS drwxr-xr-x - root supergroup 0 2013-11-06 19:31 /user/root/output2/_logs -rw-r--r-- 3 root supergroup 543078 2013-11-06 19:31 /user/root/output2/part-00000 root@master:/data/hadoop# hadoop dfs -copyToLocal output2/part-00000 . root@master:/data/hadoop# hadoop dfs -rmr output2 Deleted hdfs://master:54310/user/root/output2 root@master:/data/hadoop# head part-00000 "A 1 "Come 1 "Defects," 2 "I 2 "Information 2 "J" 1 "Plain 4 "Project 10 "Right 2 "Viator" 1 root@master:/data/hadoop# tail part-00000 zone 1 zones: 1 zoo. 1 zoological 1 zouave's 1 zrads, 2 zrads. 1 � 7 �lus,_ 1 �tat_. 1
Terminate The Hadoop Cluster
Exit from the SSH session to the Master node, and terminate the cluster.
root@master:/data/hadoop# exit logout Connection to ec2-54-205-166-74.compute-1.amazonaws.com closed. [localMac] ~/.starcluster$: starcluster terminate myhadoopcluster StarCluster - (http://star.mit.edu/cluster) (v. 0.9999) Software Tools for Academics and Researchers (STAR) Please submit bug reports to starcluster@mit.edu Terminate EBS cluster myhadoopcluster (y/n)? y >>> Running plugin starcluster.plugins.hadoop.Hadoop >>> Running plugin starcluster.plugins.sge.SGEPlugin >>> Running plugin starcluster.clustersetup.DefaultClusterSetup >>> Detaching volume vol-ffffffff from master ... >>> Waiting for cluster to terminate... >>> Removing @sc-myhadoopcluster security group
Challenge # 2 |
A MapReduce applications is essentially the combination of two functions, a map() function, and a reduce() function. Take a look at how these two functions together, with the MapReduce infrastructure of Hadoop, performe the computation of a histogram (which is really what the wordcount program generates). Map() gets single words from the document(s) and generates tuples of the form (word, 1) that it stores in a tuple world. These tuples are automatically sorted by their first field, which is a word from the document, and passed on as blocks of tuples with identical first field to reduce() which counts how many tuples have identical first field and outputs the result in the output file as a line containing the word first, and the count second.
Modify either map() or reduce() so that your word count program outputs only words that start with an uppercase letter. Verify that your program works.
Hints: Hadoop introduces a lot of new types. One is Text, that is Hadoop's version of the Java String. You can go from a Text variable to a String variable using the toString() method.
Text word; String s; s = word.toString();
To go from String to Text, you can do this:
Text word; String s; word = new Text( s );
Stack-Overflow has a good recipe for testing if the first character of a String is uppercase:
String s; if ( Character.isUpperCase(s.charAt( 0 ) ) ) { ... }