Difference between revisions of "CSC352 MapReduce/Hadoop Class Notes"
(→The Computation) |
|||
(46 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
− | + | <onlysmith> | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
<!-- =References= | <!-- =References= | ||
Line 19: | Line 10: | ||
* Introduced in 2004 <ref name="wikipedia">Wikipedia: http://en.wikipedia.org/wiki/MapReduce</ref> | * Introduced in 2004 <ref name="wikipedia">Wikipedia: http://en.wikipedia.org/wiki/MapReduce</ref> | ||
* MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on clusters of computers <ref name="wikipedia" /> | * MapReduce is a patented software framework introduced by Google to support distributed computing on large data sets on clusters of computers <ref name="wikipedia" /> | ||
− | * 2010 first conference: The First International Workshop on MapReduce and its Applications (MAPREDUCE'10). <ref name="mapreduce2010">MapReduce 2010: http://graal.ens-lyon.fr/mapreduce/</ref> Interesting tidbit: nobody from Google on planning committee. | + | * 2010 first conference: The First International Workshop on MapReduce and its Applications (MAPREDUCE'10). <ref name="mapreduce2010">MapReduce 2010: http://graal.ens-lyon.fr/mapreduce/</ref> Interesting tidbit: nobody from Google on planning committee. Mostly INRIA |
+ | |||
+ | ==MapReduce Sibblings, and Related Projects== | ||
+ | Much information here gathered from <ref name="byzantine">Byzantine Reality: http://www.byzantinereality.com/?p=323</ref>. | ||
− | + | * '''Hadoop''', open source. Written in Java, with a few routines in C. Wins Terabyte sorting competition in 2008 <ref name="terabyteSort">Apache Hadoop wins Terabyte Sort Benchmark, http://developer.yahoo.net/blogs/hadoop/2008/07/apache_hadoop_wins_terabyte_sort_benchmark.html</ref> | |
− | * Hadoop, open source. Written in Java, with a few routines in C. Wins Terabyte sorting competition in 2008 <ref name="terabyteSort">Apache Hadoop wins Terabyte Sort Benchmark, http://developer.yahoo.net/blogs/hadoop/2008/07/apache_hadoop_wins_terabyte_sort_benchmark.html</ref> | + | ::Apache Hadoop Wins Terabyte Sort Benchmark<br /> One of Yahoo's Hadoop clusters sorted 1 terabyte of data in 209 seconds, which beat the previous record of 297 seconds in the annual general purpose (daytona) terabyte sort benchmark. The sort benchmark, which was created in 1998 by Jim Gray, specifies the input data (10 billion 100 byte records), which must be completely sorted and written to disk. This is the first time that either a Java or an open source program has won. Yahoo is both the largest user of Hadoop with '''13,000+''' nodes running hundreds of thousands of jobs a month and the largest contributor, although non-Yahoo usage and contributions are increasing rapidly. |
− | ::Apache Hadoop Wins Terabyte Sort Benchmark<br /> One of Yahoo's Hadoop clusters sorted 1 terabyte of data in 209 seconds, which beat the previous record of 297 seconds in the annual general purpose (daytona) terabyte sort benchmark. The sort benchmark, which was created in 1998 by Jim Gray, specifies the input data (10 billion 100 byte records), which must be completely sorted and written to disk. This is the first time that either a Java or an open source program has won. Yahoo is both the largest user of Hadoop with 13,000+ nodes running hundreds of thousands of jobs a month and the largest contributor, although non-Yahoo usage and contributions are increasing rapidly. | ||
::Their implementation provided also to be highly scalable, and last year placed first place in the Terabyte Sort Benchmark. In order to do so, they used 910 nodes, each with 2 cores (a total of 1820 cores), and were able to keep the data entirely in memory across the nodes. With this many machines and their open-source MapReduce implementation, they were able to sort one terabyte of data in 209 seconds, in contrast to the previous record of 297 seconds. Not one to be left behind, Google showed that they were still quite ahead, claiming they could do the same with their closed-source implementation in only 68 seconds. The only details they note are that they use 1000 nodes, but since it’s not verifiable or mentioned on the official Sort Benchmark Home Page, it may not be worthwhile to discuss it further. | ::Their implementation provided also to be highly scalable, and last year placed first place in the Terabyte Sort Benchmark. In order to do so, they used 910 nodes, each with 2 cores (a total of 1820 cores), and were able to keep the data entirely in memory across the nodes. With this many machines and their open-source MapReduce implementation, they were able to sort one terabyte of data in 209 seconds, in contrast to the previous record of 297 seconds. Not one to be left behind, Google showed that they were still quite ahead, claiming they could do the same with their closed-source implementation in only 68 seconds. The only details they note are that they use 1000 nodes, but since it’s not verifiable or mentioned on the official Sort Benchmark Home Page, it may not be worthwhile to discuss it further. | ||
− | * Phoenix <ref name="phoenix">Evaluating MapReduce for Multi-core and Multiprocessor Systems | + | * '''Phoenix''' <ref name="phoenix">Evaluating MapReduce for Multi-core and Multiprocessor Systems |
Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis∗ | Colby Ranger, Ramanan Raghuraman, Arun Penmetsa, Gary Bradski, Christos Kozyrakis∗ | ||
− | Computer Systems Laboratory, Stanford University, http://csl.stanford.edu/~christos/publications/2007.cmp_mapreduce.hpca.pdf</ref> | + | Computer Systems Laboratory, Stanford University, http://csl.stanford.edu/~christos/publications/2007.cmp_mapreduce.hpca.pdf</ref> |
− | Another implementation of MapReduce, named Phoenix [2], has been created to facilitate optimal use of multi-core and multiprocessor systems. Written in C++, they sought to use the MapReduce programming paradigm to perform dense integer matrix multiplication as well as performing linear regression and principal components analysis on a matrix (amongst other applications). The following graph shows the speedup they achieve over varying number of cores: | + | ::Another implementation of MapReduce, named Phoenix [2], has been created to facilitate optimal use of multi-core and multiprocessor systems. Written in C++, they sought to use the MapReduce programming paradigm to perform dense integer matrix multiplication as well as performing linear regression and principal components analysis on a matrix (amongst other applications). The following graph shows the speedup they achieve over varying number of cores: |
<center>[[Image:MapReduceMultiCorePerformance.png]]</center> | <center>[[Image:MapReduceMultiCorePerformance.png]]</center> | ||
− | * GPU/FPGA and MapReduce <ref name="GPUMapReduce"> Map-reduce as a Programming Model for Custom Computing Machines, Jackson H.C. Yeung, C.C. Tsang, K.H. Tsoi, Bill S.H. Kwan, Chris C.C. Cheung, Anthony P.C. Chan, Philip H.W. Leong, Dept. of Computer Science and Engineering, The Chinese University of Hong Kong, Shatin NT, Hong Kong, and Cluster Technology Limited, Hong Kong Science and Technology Park, NT, Hong Kong. http://mesl.ucsd.edu/gupta/cse291-fpga/Readings/mr_fccm08.pdf</ref> | + | * '''GPU/FPGA''' and MapReduce <ref name="GPUMapReduce"> Map-reduce as a Programming Model for Custom Computing Machines, Jackson H.C. Yeung, C.C. Tsang, K.H. Tsoi, Bill S.H. Kwan, Chris C.C. Cheung, Anthony P.C. Chan, Philip H.W. Leong, Dept. of Computer Science and Engineering, The Chinese University of Hong Kong, Shatin NT, Hong Kong, and Cluster Technology Limited, Hong Kong Science and Technology Park, NT, Hong Kong. http://mesl.ucsd.edu/gupta/cse291-fpga/Readings/mr_fccm08.pdf</ref> |
<center>[[Image:MapReduceGPUFPGA.png]]</center> | <center>[[Image:MapReduceGPUFPGA.png]]</center> | ||
Line 39: | Line 32: | ||
From <ref name="functionalProgramming">Google tutorials on MapReduce, http://code.google.com/edu/submissions/mapreduce-minilecture/listing.html</ref> | From <ref name="functionalProgramming">Google tutorials on MapReduce, http://code.google.com/edu/submissions/mapreduce-minilecture/listing.html</ref> | ||
− | * Operations do not modify the data structures. New data created | + | * Operations do not modify the data structures. New data created. The data are ''immutable''. |
* Original data unmodified | * Original data unmodified | ||
* Data flows are implicit in program design | * Data flows are implicit in program design | ||
Line 79: | Line 72: | ||
<br /> | <br /> | ||
− | ==The Computation== | + | ==The Computation: micro view== |
− | + | The images in this section are mostly taken from the excellent Yahoo tutorial (Module 4) on MapReduce <ref name="YahooMapReduceTutorial4">Yahoo Tutorial, Module 4, MapReduce Basics http://developer.yahoo.com/hadoop/tutorial/module4.html</ref> | |
<br /> | <br /> | ||
<br /> | <br /> | ||
Line 97: | Line 90: | ||
<br /> | <br /> | ||
+ | ==Important Statements To Remember== | ||
+ | |||
+ | Taken from <ref name="hadoopGuide">[http://www.amazon.com/Hadoop-Definitive-Guide-Tom-White/dp/0596521979 Hadoop, the definitive guide], Tim White, O'Reilly Media, June 2009, ISBN 0596521979. The Web site for the book is http://www.hadoopbook.com/</ref> | ||
+ | |||
+ | * A '''Map Reduce job''' is a unit of work submitted by a client. | ||
+ | |||
+ | |||
+ | * A '''job''' contains<br /><br /> | ||
+ | ** the data<br /><br /> | ||
+ | ** a MapReduce program<br /><br /> | ||
+ | ** a configuration<br /><br /> | ||
+ | |||
+ | |||
+ | * Hadoop divides a '''job''' into '''tasks''', of which there are two kinds, '''map tasks''', and '''reduce tasks'''. | ||
+ | |||
+ | |||
+ | * There are two types of '''nodes''': a '''jobTracker''' node, which oversees the execution of a '''job''', and '''taskTraker''' nodes that execute '''tasks'''. | ||
+ | |||
+ | |||
+ | * Hadoop divides the '''input''' into '''splits'''. | ||
+ | |||
+ | |||
+ | * '''Hadoop creates one map task for each split''' | ||
+ | |||
+ | |||
+ | * A split is '''64 MB''' by default. | ||
+ | |||
+ | |||
+ | * '''Each map task runs the user-defined map function for each record of a split'''. | ||
+ | |||
+ | |||
+ | * Hadoop does its best to run the map task on the node where the split resides, '''but it is not always the case'''. | ||
+ | |||
+ | |||
+ | * The '''sorted''' map outputs are transfered across the network to where the reduce task is running. These '''sorted''' outputs are '''merged''' and fed to the user-defined '''reduce function.''' | ||
+ | |||
+ | |||
+ | * The '''output''' of the '''reduce task''' is stored in the '''HDFS'''. | ||
+ | |||
+ | |||
+ | * When they are many reducers, the map tasks '''partition''' their output into '''partitions'''. There is '''one''' partition per '''reduce task'''. | ||
+ | |||
+ | |||
+ | == Examples of Data Flows== | ||
+ | Taken from <ref name="hadoopGuide" /> | ||
+ | |||
+ | <center> | ||
+ | [[Image:MapReduceDataFlowOneReduce.png]] | ||
+ | </center> | ||
+ | The shaded boxes are nodes. The dotted arrows show transfers on a node. The heavy arrows show transfers across nodes. | ||
+ | |||
+ | <center> | ||
+ | [[Image:MapReduceDataFlowTwoReduces.png]] | ||
+ | </center> | ||
+ | The general case of having several reduce tasks. In this case the outputs of the map tasks are shuffled; each reduce task receives many outputs of map tasks. | ||
+ | |||
+ | |||
+ | <center> | ||
+ | [[Image:MapReduceDataFlowNoReduce.png]] | ||
+ | </center> | ||
+ | It is possible to have 0 reduce tasks... | ||
+ | |||
+ | |||
+ | ==Computation: macro view== | ||
<center>[[Image:MapReduceDataFlow.png]]</center> | <center>[[Image:MapReduceDataFlow.png]]</center> | ||
<br /> | <br /> | ||
Line 106: | Line 163: | ||
=Programming Model= | =Programming Model= | ||
+ | |||
+ | Material also taken from <ref name="YahooMapReduceTutorial4" />. | ||
+ | |||
+ | ==The WordCount Program (example)== | ||
+ | |||
+ | <center>[[Image:wordcountMapReduceBlockDiagram.png]] </center> | ||
+ | |||
+ | |||
+ | ===The Map Function, simplified=== | ||
+ | |||
+ | mapper (filename, file-contents): | ||
+ | for each word in file-contents: | ||
+ | emit (word, 1) | ||
+ | |||
+ | |||
+ | ===The Reduce Function, simplified=== | ||
+ | |||
+ | reducer (word, values): | ||
+ | sum = 0 | ||
+ | for each value in values: | ||
+ | sum = sum + value | ||
+ | emit (word, sum) | ||
+ | |||
+ | ===The Whole Program=== | ||
+ | |||
+ | [[Hadoop WordCount.java | WordCount.java]] | ||
+ | |||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
+ | <greenbox> | ||
+ | [[Image:ComputerLogo.png|100px|right]] | ||
+ | ;Lab Experiment 1 | ||
+ | :Jump to the first Hadoop/MapReduce [[Hadoop_Tutorial_1_--_Running_WordCount | Lab #1]]! Run all the sections up to, but not including Section 4. | ||
+ | </greenbox> | ||
+ | |||
+ | =Test: Running WordCound on 1, 2, 4, 8, and 16 identical files= | ||
+ | |||
+ | * Setup: create 5 directories, one with 1 file, the other with 2 copies of the same file (with different names), one with 4 copies of the file, one with 8 copies, and 1 with 16 copies. | ||
+ | |||
+ | * the file contains "the quick red fox jumped over the lazy brown sleeping dog." which is '''10''' words long, and '''60''' bytes in size. | ||
+ | |||
+ | * The command to run hadoop on these directories is: | ||
+ | |||
+ | hadoop jar /home/hadoop/352/dft/wordcount.jar '''org.myorg.WordCount dft_16fox''' '''dft_16fox-output''' | ||
+ | |||
+ | |||
+ | {| | ||
+ | ! Output Log | ||
+ | ! 1 file | ||
+ | ! 2 files | ||
+ | ! 4 files | ||
+ | ! 8 files | ||
+ | ! 16 files | ||
+ | ! comments | ||
+ | |- | ||
+ | | | ||
+ | <code><pre> | ||
+ | Counters: 17 | ||
+ | File Systems | ||
+ | HDFS bytes read | ||
+ | HDFS bytes written | ||
+ | Local bytes read | ||
+ | Local bytes written | ||
+ | Job Counters | ||
+ | Launched reduce tasks | ||
+ | Rack-local map tasks | ||
+ | Launched map tasks | ||
+ | Data-local map tasks | ||
+ | Map-Reduce Framework | ||
+ | Reduce input groups | ||
+ | Combine output records | ||
+ | Map input records | ||
+ | Reduce output records | ||
+ | Map output bytes | ||
+ | Map input bytes | ||
+ | Combine input records | ||
+ | Map output records | ||
+ | Reduce input records | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | 17 | ||
+ | |||
+ | 401 | ||
+ | 75 | ||
+ | 121 | ||
+ | 644 | ||
+ | |||
+ | 1 | ||
+ | 6 | ||
+ | 12 | ||
+ | 6 | ||
+ | |||
+ | 10 | ||
+ | 10 | ||
+ | 1 | ||
+ | 10 | ||
+ | 103 | ||
+ | 60 | ||
+ | 11 | ||
+ | 11 | ||
+ | 10 | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | 17 | ||
+ | |||
+ | 430 | ||
+ | 75 | ||
+ | 236 | ||
+ | 874 | ||
+ | |||
+ | 1 | ||
+ | 6 | ||
+ | 12 | ||
+ | 6 | ||
+ | |||
+ | 10 | ||
+ | 20 | ||
+ | 2 | ||
+ | 10 | ||
+ | 206 | ||
+ | 120 | ||
+ | 22 | ||
+ | 22 | ||
+ | 20 | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | 17 | ||
+ | |||
+ | 476 | ||
+ | 75 | ||
+ | 466 | ||
+ | 1334 | ||
+ | |||
+ | 1 | ||
+ | 5 | ||
+ | 12 | ||
+ | 7 | ||
+ | |||
+ | 10 | ||
+ | 40 | ||
+ | 4 | ||
+ | 10 | ||
+ | 412 | ||
+ | 240 | ||
+ | 44 | ||
+ | 44 | ||
+ | 40 | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | 17 | ||
+ | |||
+ | 624 | ||
+ | 76 | ||
+ | 926 | ||
+ | 2390 | ||
+ | |||
+ | 1 | ||
+ | 2 | ||
+ | 16 | ||
+ | 14 | ||
+ | |||
+ | 10 | ||
+ | 80 | ||
+ | 8 | ||
+ | 10 | ||
+ | 824 | ||
+ | 480 | ||
+ | 88 | ||
+ | 88 | ||
+ | 80 | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | 17 | ||
+ | |||
+ | 960 | ||
+ | 85 | ||
+ | 1846 | ||
+ | 4230 | ||
+ | |||
+ | 1 | ||
+ | 1 | ||
+ | 16 | ||
+ | 15 | ||
+ | |||
+ | 10 | ||
+ | 160 | ||
+ | 16 | ||
+ | 10 | ||
+ | 1648 | ||
+ | 960 | ||
+ | 176 | ||
+ | 176 | ||
+ | 160 | ||
+ | </pre></code> | ||
+ | | | ||
+ | <code><pre> | ||
+ | ? | ||
+ | |||
+ | # of bytes read by map and reduce tasks | ||
+ | # bytes written by map and reduce tasks | ||
+ | ? | ||
+ | ? | ||
+ | |||
+ | # reduce tasks launched, including speculative tasks | ||
+ | # tasks that ran on node in same rack as input data | ||
+ | # map tasks launched, including speculative tasks | ||
+ | # map tasks that ran on same node as input data | ||
+ | |||
+ | # reduce key groups consumed by all reducers | ||
+ | # output records produced by combiners | ||
+ | # input records consumed by map tasks | ||
+ | # records produced by all the maps | ||
+ | # bytes produced by all the maps | ||
+ | # input bytes consumed by maps | ||
+ | # records consumed by combiners | ||
+ | # output records produced by map tasks | ||
+ | # records consumed by reduce tasks | ||
+ | </pre></code> | ||
+ | |} | ||
+ | |||
+ | =Compiling Your Own Version of the WordCount Program= | ||
+ | |||
+ | * This is illustrated and explained in Section 4 of [[Hadoop_Tutorial_1_--_Running_WordCount#Running_Your_Own_Version_of_WordCount.java Tutorial #1 | Tutorial #1: Compiling your own version of WordCoung.java]] | ||
+ | |||
+ | =How does hadoop on 6 compare to Linux on 1?= | ||
+ | |||
+ | * This is very interesting! | ||
+ | |||
+ | <greenbox> | ||
+ | [[Image:ComputerLogo.png|100px|right]] | ||
+ | ;Lab Experiment 2 | ||
+ | :Jump to the Section 5 of the [[Hadoop_Tutorial_1_--_Running_WordCount | Hadoop Lab #1]] and see how Hadoop compares with basic Linux for Ulysses, and for Ulysses plus 5 other books | ||
+ | |||
+ | |||
+ | </greenbox> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
+ | ;Question 1 | ||
+ | : Comment on the timing you observe, for 1 book, and for 6 books. | ||
+ | |||
+ | ;Question 2 | ||
+ | : There a 4 large files in the HDFS, in '''wikipages/block/'''. Each is approximately 180 MByte in size. Run another experiment and compare the execution time of hadoop on the 4 files (~3/4 GByte) and of one of the Linux boxes on the same 4 files using Linux commands. Compare the execution times again. | ||
+ | =Generating Task Timelines= | ||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | <greenbox> | ||
+ | [[Image:ComputerLogo.png|right |100px]] | ||
+ | ;Lab Experiment #3: | ||
+ | : [[Hadoop Tutorial 1.1 -- Generating Task Timelines | Tutorial 1.1]] on generating '''Timelines'''. | ||
+ | |||
+ | </greenbox> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
+ | =Debugging/Testing using Counters= | ||
+ | |||
+ | Section 6 of [[Hadoop_Tutorial_1_--_Running_WordCount#Counters | Tutorial #1]] shows how to create counters. Hadoop Counters are special variables that are gathered after each task runs and the values are accumulated and reported at the end and during the computation. They are useful for counting quantities such as amount of data processed, number of tasks executed, etc. | ||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | <greenbox> | ||
+ | [[Image:ComputerLogo.png|right |100px]] | ||
+ | ;Lab Experiment #4: | ||
+ | : [[Hadoop_Tutorial_1_--_Running_WordCount#Counters | Tutorial 1 on Counters]]. Create counters in your Java version of WordCount and count the number of Map tasks and the number of Reduce tasks. | ||
+ | |||
+ | </greenbox> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
+ | |||
+ | =Running WordCount in Python= | ||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | <greenbox> | ||
+ | [[Image:ComputerLogo.png|right |100px]] | ||
+ | ;Lab Experiment #5: | ||
+ | : [[Hadoop Tutorial 2 -- Running WordCount in Python | Tutorial 2]] on running Python programs with MapReduce/Hadoop. | ||
+ | |||
+ | </greenbox> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | |||
=References= | =References= | ||
<references /> | <references /> | ||
+ | |||
+ | </onlysmith> | ||
+ | |||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | <br /> | ||
+ | [[Category:CSC352]][[Category:Class Notes]][[Category:MapReduce]][[Category:Hadoop]] |
Latest revision as of 08:16, 6 April 2010