Difference between revisions of "CSC352 MapReduce/Hadoop Class Notes"

From dftwiki3
Jump to: navigation, search
(Test: Running WordCound on 1, 2, 4, 8, and 16 identical files)
(Test: Running WordCound on 1, 2, 4, 8, and 16 identical files)
Line 108: Line 108:
 
=Test: Running WordCound on 1, 2, 4, 8, and 16 identical files=
 
=Test: Running WordCound on 1, 2, 4, 8, and 16 identical files=
  
{|
+
{|  
! Number of input files
+
! Log
! Output log
+
! 1 file
 +
! 2 files
 +
! 4 files
 +
! 8 files
 +
! 16 files
 
|-
 
|-
| 1 file
 
 
|
 
|
 
<code><pre>
 
<code><pre>
10/03/29 18:18:59 INFO mapred.JobClient: Counters: 17
+
Counters: 17
10/03/29 18:18:59 INFO mapred.JobClient:  File Systems
+
File Systems
10/03/29 18:18:59 INFO mapred.JobClient:    HDFS bytes read=401
+
  HDFS bytes read
10/03/29 18:18:59 INFO mapred.JobClient:    HDFS bytes written=75
+
  HDFS bytes written
10/03/29 18:18:59 INFO mapred.JobClient:    Local bytes read=121
+
  Local bytes read
10/03/29 18:18:59 INFO mapred.JobClient:    Local bytes written=644
+
  Local bytes written
10/03/29 18:18:59 INFO mapred.JobClient:  Job Counters  
+
Job Counters  
10/03/29 18:18:59 INFO mapred.JobClient:    Launched reduce tasks=1
+
  Launched reduce tasks
10/03/29 18:18:59 INFO mapred.JobClient:    Rack-local map tasks=6
+
  Rack-local map tasks
10/03/29 18:18:59 INFO mapred.JobClient:    Launched map tasks=12
+
  Launched map tasks
10/03/29 18:18:59 INFO mapred.JobClient:    Data-local map tasks=6
+
  Data-local map tasks
10/03/29 18:18:59 INFO mapred.JobClient:  Map-Reduce Framework
+
Map-Reduce Framework
10/03/29 18:18:59 INFO mapred.JobClient:    Reduce input groups=10
+
  Reduce input groups
10/03/29 18:18:59 INFO mapred.JobClient:    Combine output records=10
+
  Combine output records
10/03/29 18:18:59 INFO mapred.JobClient:    Map input records=1
+
  Map input records
10/03/29 18:18:59 INFO mapred.JobClient:    Reduce output records=10
+
  Reduce output records
10/03/29 18:18:59 INFO mapred.JobClient:    Map output bytes=103
+
  Map output bytes
10/03/29 18:18:59 INFO mapred.JobClient:    Map input bytes=60
+
  Map input bytes
10/03/29 18:18:59 INFO mapred.JobClient:    Combine input records=11
+
  Combine input records
10/03/29 18:18:59 INFO mapred.JobClient:    Map output records=11
+
  Map output records
10/03/29 18:18:59 INFO mapred.JobClient:    Reduce input records=10
+
  Reduce input records
 
</pre></code>
 
</pre></code>
 
|-
 
|-
|
 
2 files
 
 
|
 
|
 
<code><pre>
 
<code><pre>
10/03/29 18:23:39 INFO mapred.JobClient: Counters: 17
+
17
10/03/29 18:23:39 INFO mapred.JobClient:  File Systems
+
 
10/03/29 18:23:39 INFO mapred.JobClient:    HDFS bytes read=430
+
401
10/03/29 18:23:39 INFO mapred.JobClient:    HDFS bytes written=75
+
75
10/03/29 18:23:39 INFO mapred.JobClient:    Local bytes read=236
+
75
10/03/29 18:23:39 INFO mapred.JobClient:    Local bytes written=874
+
121
10/03/29 18:23:39 INFO mapred.JobClient:  Job Counters
+
644
10/03/29 18:23:39 INFO mapred.JobClient:    Launched reduce tasks=1
+
 
10/03/29 18:23:39 INFO mapred.JobClient:    Rack-local map tasks=6
+
1
10/03/29 18:23:39 INFO mapred.JobClient:    Launched map tasks=12
+
6
10/03/29 18:23:39 INFO mapred.JobClient:    Data-local map tasks=6
+
12
10/03/29 18:23:39 INFO mapred.JobClient:  Map-Reduce Framework
+
6
10/03/29 18:23:39 INFO mapred.JobClient:    Reduce input groups=10
 
10/03/29 18:23:39 INFO mapred.JobClient:    Combine output records=20
 
10/03/29 18:23:39 INFO mapred.JobClient:    Map input records=2
 
10/03/29 18:23:39 INFO mapred.JobClient:    Reduce output records=10
 
10/03/29 18:23:39 INFO mapred.JobClient:    Map output bytes=206
 
10/03/29 18:23:39 INFO mapred.JobClient:    Map input bytes=120
 
10/03/29 18:23:39 INFO mapred.JobClient:    Combine input records=22
 
10/03/29 18:23:39 INFO mapred.JobClient:    Map output records=22
 
10/03/29 18:23:39 INFO mapred.JobClient:    Reduce input records=20
 
  
 +
10
 +
10
 +
1
 +
10
 +
103
 +
60
 +
11
 +
11
 +
10
 
</pre></code>
 
</pre></code>
|-
 
|
 
4 files
 
 
|
 
|
 
<code><pre>
 
<code><pre>
10/03/29 18:25:25 INFO mapred.JobClient: Counters: 17
+
17
10/03/29 18:25:25 INFO mapred.JobClient:  File Systems
+
 
10/03/29 18:25:25 INFO mapred.JobClient:    HDFS bytes read=476
+
430
10/03/29 18:25:25 INFO mapred.JobClient:    HDFS bytes written=75
+
75
10/03/29 18:25:25 INFO mapred.JobClient:    Local bytes read=466
+
236
10/03/29 18:25:25 INFO mapred.JobClient:    Local bytes written=1334
+
874
10/03/29 18:25:25 INFO mapred.JobClient:  Job Counters
+
 
10/03/29 18:25:25 INFO mapred.JobClient:    Launched reduce tasks=1
+
1
10/03/29 18:25:25 INFO mapred.JobClient:    Rack-local map tasks=5
+
6
10/03/29 18:25:25 INFO mapred.JobClient:    Launched map tasks=12
+
12
10/03/29 18:25:25 INFO mapred.JobClient:    Data-local map tasks=7
+
6
10/03/29 18:25:25 INFO mapred.JobClient:  Map-Reduce Framework
 
10/03/29 18:25:25 INFO mapred.JobClient:    Reduce input groups=10
 
10/03/29 18:25:25 INFO mapred.JobClient:    Combine output records=40
 
10/03/29 18:25:25 INFO mapred.JobClient:    Map input records=4
 
10/03/29 18:25:25 INFO mapred.JobClient:    Reduce output records=10
 
10/03/29 18:25:25 INFO mapred.JobClient:    Map output bytes=412
 
10/03/29 18:25:25 INFO mapred.JobClient:    Map input bytes=240
 
10/03/29 18:25:25 INFO mapred.JobClient:    Combine input records=44
 
10/03/29 18:25:25 INFO mapred.JobClient:    Map output records=44
 
10/03/29 18:25:25 INFO mapred.JobClient:    Reduce input records=40
 
  
 +
10
 +
21
 +
2
 +
10
 +
206
 +
120
 +
22
 +
22
 +
20
 
</pre></code>
 
</pre></code>
|-
 
|
 
8 files
 
 
|
 
|
 
<code><pre>
 
<code><pre>
10/03/29 18:26:30 INFO mapred.JobClient: Counters: 17
+
17
10/03/29 18:26:30 INFO mapred.JobClient:  File Systems
+
 
10/03/29 18:26:30 INFO mapred.JobClient:    HDFS bytes read=624
+
624
10/03/29 18:26:30 INFO mapred.JobClient:    HDFS bytes written=76
+
76
10/03/29 18:26:30 INFO mapred.JobClient:    Local bytes read=926
+
926
10/03/29 18:26:30 INFO mapred.JobClient:    Local bytes written=2390
+
2390
10/03/29 18:26:30 INFO mapred.JobClient:  Job Counters
+
 
10/03/29 18:26:30 INFO mapred.JobClient:    Launched reduce tasks=1
+
1
10/03/29 18:26:30 INFO mapred.JobClient:    Rack-local map tasks=2
+
2
10/03/29 18:26:30 INFO mapred.JobClient:    Launched map tasks=16
+
16
10/03/29 18:26:30 INFO mapred.JobClient:    Data-local map tasks=14
+
14
10/03/29 18:26:30 INFO mapred.JobClient:  Map-Reduce Framework
 
10/03/29 18:26:30 INFO mapred.JobClient:    Reduce input groups=10
 
10/03/29 18:26:30 INFO mapred.JobClient:    Combine output records=80
 
10/03/29 18:26:30 INFO mapred.JobClient:    Map input records=8
 
10/03/29 18:26:30 INFO mapred.JobClient:    Reduce output records=10
 
10/03/29 18:26:30 INFO mapred.JobClient:    Map output bytes=824
 
10/03/29 18:26:30 INFO mapred.JobClient:    Map input bytes=480
 
10/03/29 18:26:30 INFO mapred.JobClient:    Combine input records=88
 
10/03/29 18:26:30 INFO mapred.JobClient:    Map output records=88
 
10/03/29 18:26:30 INFO mapred.JobClient:    Reduce input records=80
 
  
 +
10
 +
80
 +
8
 +
10
 +
824
 +
480
 +
88
 +
88
 +
80
 
</pre></code>
 
</pre></code>
|-
 
|
 
16 files
 
 
|
 
|
 
<code><pre>
 
<code><pre>
10/03/29 18:27:39 INFO mapred.JobClient: Counters: 17
+
17
10/03/29 18:27:39 INFO mapred.JobClient:  File Systems
+
 
10/03/29 18:27:39 INFO mapred.JobClient:    HDFS bytes read=960
+
960
10/03/29 18:27:39 INFO mapred.JobClient:    HDFS bytes written=85
+
85
10/03/29 18:27:39 INFO mapred.JobClient:    Local bytes read=1846
+
1846
10/03/29 18:27:39 INFO mapred.JobClient:    Local bytes written=4230
+
4230
10/03/29 18:27:39 INFO mapred.JobClient:  Job Counters
+
 
10/03/29 18:27:39 INFO mapred.JobClient:    Launched reduce tasks=1
+
1
10/03/29 18:27:39 INFO mapred.JobClient:    Rack-local map tasks=1
+
1
10/03/29 18:27:39 INFO mapred.JobClient:    Launched map tasks=16
+
16
10/03/29 18:27:39 INFO mapred.JobClient:    Data-local map tasks=15
+
15
10/03/29 18:27:39 INFO mapred.JobClient:  Map-Reduce Framework
 
10/03/29 18:27:39 INFO mapred.JobClient:    Reduce input groups=10
 
10/03/29 18:27:39 INFO mapred.JobClient:    Combine output records=160
 
10/03/29 18:27:39 INFO mapred.JobClient:    Map input records=16
 
10/03/29 18:27:39 INFO mapred.JobClient:    Reduce output records=10
 
10/03/29 18:27:39 INFO mapred.JobClient:    Map output bytes=1648
 
10/03/29 18:27:39 INFO mapred.JobClient:    Map input bytes=960
 
10/03/29 18:27:39 INFO mapred.JobClient:    Combine input records=176
 
10/03/29 18:27:39 INFO mapred.JobClient:    Map output records=176
 
10/03/29 18:27:39 INFO mapred.JobClient:    Reduce input records=160
 
  
 +
10
 +
160
 +
16
 +
10
 +
1648
 +
960
 +
176
 +
176
 +
160
 
</pre></code>
 
</pre></code>
 
|}
 
|}

Revision as of 17:38, 29 March 2010

Outline

  • History
  • Based on Functional Programming
  • Infrastructure, Streaming
  • Submitting a Job
  • Smith Cluster
  • Example 1: Java
  • Example 2: Python (streaming)
  • Useful Commands


History

  • Introduced in 2004 [1]
  • MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on clusters of computers [1]
  • 2010 first conference: The First International Workshop on MapReduce and its Applications (MAPREDUCE'10). [2] Interesting tidbit: nobody from Google on planning committee. Mostly INRIA

MapReduce Sibblings, and Related Projects

  • Hadoop, open source. Written in Java, with a few routines in C. Wins Terabyte sorting competition in 2008 [3]
Apache Hadoop Wins Terabyte Sort Benchmark
One of Yahoo's Hadoop clusters sorted 1 terabyte of data in 209 seconds, which beat the previous record of 297 seconds in the annual general purpose (daytona) terabyte sort benchmark. The sort benchmark, which was created in 1998 by Jim Gray, specifies the input data (10 billion 100 byte records), which must be completely sorted and written to disk. This is the first time that either a Java or an open source program has won. Yahoo is both the largest user of Hadoop with 13,000+ nodes running hundreds of thousands of jobs a month and the largest contributor, although non-Yahoo usage and contributions are increasing rapidly.
Their implementation provided also to be highly scalable, and last year placed first place in the Terabyte Sort Benchmark. In order to do so, they used 910 nodes, each with 2 cores (a total of 1820 cores), and were able to keep the data entirely in memory across the nodes. With this many machines and their open-source MapReduce implementation, they were able to sort one terabyte of data in 209 seconds, in contrast to the previous record of 297 seconds. Not one to be left behind, Google showed that they were still quite ahead, claiming they could do the same with their closed-source implementation in only 68 seconds. The only details they note are that they use 1000 nodes, but since it’s not verifiable or mentioned on the official Sort Benchmark Home Page, it may not be worthwhile to discuss it further.
  • Phoenix [4] Another implementation of MapReduce, named Phoenix [2], has been created to facilitate optimal use of multi-core and multiprocessor systems. Written in C++, they sought to use the MapReduce programming paradigm to perform dense integer matrix multiplication as well as performing linear regression and principal components analysis on a matrix (amongst other applications). The following graph shows the speedup they achieve over varying number of cores:
MapReduceMultiCorePerformance.png
  • GPU/FPGA and MapReduce [5]
MapReduceGPUFPGA.png

MapReduce Based on Functional Programming Concepts

From [6]

  • Operations do not modify the data structures. New data created. The data are immutable.
  • Original data unmodified
  • Data flows are implicit in program design
  • order of operations does not matter

Motivation

  • Grand scale: data > TB, # servers > 1000.
  • Automatic parallelization
  • Fault Tolerant
  • Simple and clean to program

Infrastructure

The Block Diagram

MapReduceInfrastructure.png

The Data is stored in the HDFS

  • The data is held in a distributed file system, the Hadoop Distributed File System (HDFS)
  • The goal of a DFS [7]
A distributed file system is designed to hold a large amount of data and provide access to this data to many clients distributed across a network. There are a number of distributed file systems that solve this problem in different ways.
  • HDFS is designed to store a very large amount of information (terabytes or petabytes). This requires spreading the data across a large number of machines. It also supports much larger file sizes than NFS.
  • HDFS should store data reliably. If individual machines in the cluster malfunction, data should still be available.
  • HDFS should provide fast, scalable access to this information.
  • HDFS should integrate well with Hadoop MapReduce, allowing data to be read and computed upon locally when possible.



HadoopFileSystem.png
Figure 2.1: DataNodes holding blocks of multiple files with a replication factor of 2.
The NameNode maps the filenames onto the block ids.[7]



The Computation

This material is mostly taken from the excellent Yahoo tutorial (Module 4) on MapReduce [8]

MapFunction.png



ReduceFunction.png



MapReduceReduceMore.png



MapReduceDataFlow.png



MapReduceACloserLook.png



Programming Model

Test: Running WordCound on 1, 2, 4, 8, and 16 identical files

Log 1 file 2 files 4 files 8 files 16 files
Counters: 17
File Systems
   HDFS bytes read
   HDFS bytes written
   Local bytes read
   Local bytes written
Job Counters 
   Launched reduce tasks
   Rack-local map tasks
   Launched map tasks
   Data-local map tasks
Map-Reduce Framework
   Reduce input groups
   Combine output records
   Map input records
   Reduce output records
   Map output bytes
   Map input bytes
   Combine input records
   Map output records
   Reduce input records
17

401
75
75
121
644

1
6
12
6

10
10
1
10
103
60
11
11
10
17

430
75
236
874

1
6
12
6

10
21
2
10
206
120
22
22
20
17

624
76
926
2390

1
2
16
14

10
80
8
10
824
480
88
88
80
17

960
85
1846
4230

1
1
16
15

10
160
16
10
1648
960
176
176
160

References

  1. 1.0 1.1 Wikipedia: http://en.wikipedia.org/wiki/MapReduce
  2. MapReduce 2010: http://graal.ens-lyon.fr/mapreduce/
  3. Apache Hadoop wins Terabyte Sort Benchmark, http://developer.yahoo.net/blogs/hadoop/2008/07/apache_hadoop_wins_terabyte_sort_benchmark.html
  4. Evaluating MapReduce for Multi-core and Multiprocessor Systems Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis∗ Computer Systems Laboratory, Stanford University, http://csl.stanford.edu/~christos/publications/2007.cmp_mapreduce.hpca.pdf
  5. Map-reduce as a Programming Model for Custom Computing Machines, Jackson H.C. Yeung, C.C. Tsang, K.H. Tsoi, Bill S.H. Kwan, Chris C.C. Cheung, Anthony P.C. Chan, Philip H.W. Leong, Dept. of Computer Science and Engineering, The Chinese University of Hong Kong, Shatin NT, Hong Kong, and Cluster Technology Limited, Hong Kong Science and Technology Park, NT, Hong Kong. http://mesl.ucsd.edu/gupta/cse291-fpga/Readings/mr_fccm08.pdf
  6. Google tutorials on MapReduce, http://code.google.com/edu/submissions/mapreduce-minilecture/listing.html
  7. 7.0 7.1 Yahoo Hadoop Tutorial, Module 2, http://developer.yahoo.com/hadoop/tutorial/module2.html
  8. Yahoo Tutorial, Module 4, MapReduce Basics http://developer.yahoo.com/hadoop/tutorial/module4.html