Difference between revisions of "CSC352 ApacheHadoopJobHistorySummary.py"
(Created page with '--~~~~ ---- From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html) <tanbox> Re: generate task timeline figures like "Hadoop Sorts a Petaby…') |
(→job_history_summary.py) |
||
(2 intermediate revisions by the same user not shown) | |||
Line 1: | Line 1: | ||
--[[User:Thiebaut|D. Thiebaut]] 17:58, 4 April 2010 (UTC) | --[[User:Thiebaut|D. Thiebaut]] 17:58, 4 April 2010 (UTC) | ||
---- | ---- | ||
+ | |||
+ | ==Information== | ||
From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html) | From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html) | ||
Line 30: | Line 32: | ||
<br /> | <br /> | ||
<br /> | <br /> | ||
+ | |||
+ | ==job_history_summary.py== | ||
<source lang="python"> | <source lang="python"> | ||
Line 54: | Line 58: | ||
pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *') | pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *') | ||
− | + | counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?') | |
− | counterPat = re.compile( | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
def parse(tail): | def parse(tail): | ||
Line 79: | Line 73: | ||
reduceEndTime = {} | reduceEndTime = {} | ||
reduceBytes = {} | reduceBytes = {} | ||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
for line in sys.stdin: | for line in sys.stdin: | ||
− | |||
− | |||
− | |||
− | |||
− | |||
words = line.split(" ",1) | words = line.split(" ",1) | ||
event = words[0] | event = words[0] | ||
attrs = parse(words[1]) | attrs = parse(words[1]) | ||
− | if | + | if event == 'MapAttempt': |
− | |||
− | |||
− | |||
− | |||
− | |||
if attrs.has_key("START_TIME"): | if attrs.has_key("START_TIME"): | ||
− | + | mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000 | |
− | |||
− | |||
elif attrs.has_key("FINISH_TIME"): | elif attrs.has_key("FINISH_TIME"): | ||
− | mapEndTime[attrs[" | + | mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000 |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
elif event == 'ReduceAttempt': | elif event == 'ReduceAttempt': | ||
if attrs.has_key("START_TIME"): | if attrs.has_key("START_TIME"): | ||
− | + | reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000 | |
− | |||
− | |||
elif attrs.has_key("FINISH_TIME"): | elif attrs.has_key("FINISH_TIME"): | ||
− | + | reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000 | |
− | + | reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000 | |
− | + | reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000 | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
elif event == 'Task': | elif event == 'Task': | ||
if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"): | if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"): | ||
− | + | for n,v in re.findall(counterPat, attrs["COUNTERS"]): | |
− | + | if n == "File Systems.HDFS bytes written": | |
+ | reduceBytes[attrs["TASKID"]] = int(v) | ||
+ | |||
+ | runningMaps = {} | ||
+ | shufflingReduces = {} | ||
+ | sortingReduces = {} | ||
+ | runningReduces = {} | ||
+ | startTime = min(reduce(min, mapStartTime.values()), | ||
+ | reduce(min, reduceStartTime.values())) | ||
+ | endTime = max(reduce(max, mapEndTime.values()), | ||
+ | reduce(max, reduceEndTime.values())) | ||
reduces = reduceBytes.keys() | reduces = reduceBytes.keys() | ||
Line 142: | Line 110: | ||
print "Name reduce-output-bytes shuffle-finish reduce-finish" | print "Name reduce-output-bytes shuffle-finish reduce-finish" | ||
for r in reduces: | for r in reduces: | ||
− | + | print r, reduceBytes[r], reduceShuffleTime[r] - startTime, | |
− | print r, reduceBytes[r], reduceShuffleTime[ | + | print reduceEndTime[r] - startTime |
− | print reduceEndTime[ | ||
print | print | ||
− | runningMaps | + | for t in range(startTime, endTime): |
− | shufflingReduces | + | runningMaps[t] = 0 |
− | sortingReduces | + | shufflingReduces[t] = 0 |
− | runningReduces | + | sortingReduces[t] = 0 |
+ | runningReduces[t] = 0 | ||
− | + | for map in mapStartTime.keys(): | |
− | + | for t in range(mapStartTime[map], mapEndTime[map]): | |
− | for t in | + | runningMaps[t] += 1 |
− | + | for reduce in reduceStartTime.keys(): | |
+ | for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]): | ||
+ | shufflingReduces[t] += 1 | ||
+ | for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]): | ||
+ | sortingReduces[t] += 1 | ||
+ | for t in range(reduceSortTime[reduce], reduceEndTime[reduce]): | ||
+ | runningReduces[t] += 1 | ||
− | for t in range( | + | print "time maps shuffle merge reduce" |
− | runningMaps | + | for t in range(startTime, endTime): |
− | + | print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], | |
− | + | print runningReduces[t] | |
− | runningReduces | ||
− | |||
− | + | </source> | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | <br /> | |
− | + | [[Category:CSC352]][[Category:Hadoop]][[Category:MapReduce]][[Category:Python]] | |
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− | |||
− |
Latest revision as of 14:56, 4 April 2010
--D. Thiebaut 17:58, 4 April 2010 (UTC)
Information
From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html)
Re: generate task timeline figures like "Hadoop Sorts a Petabyte..." blog
Owen O'Malley
Wed, 22 Jul 2009 09:26:13 -0700
The script is at:
http://people.apache.org/~omalley/tera-2009/job_history_summary.py
The input data is the job logs from the run:
http://people.apache.org/~omalley/tera-2009/1t/job_200904102259_0008_arunc_TeraSort.log.gz
is the 1tb run's log file. Uncompress it and feed it as standard input to the script. It will generate:
http://people.apache.org/~omalley/tera-2009/1t/summary.lst
and the bottom of that fed into your favorite spreadsheet will generate the pretty graphs.
-- Owen
job_history_summary.py
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import sys
pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
def parse(tail):
result = {}
for n,v in re.findall(pat, tail):
result[n] = v
return result
mapStartTime = {}
mapEndTime = {}
reduceStartTime = {}
reduceShuffleTime = {}
reduceSortTime = {}
reduceEndTime = {}
reduceBytes = {}
for line in sys.stdin:
words = line.split(" ",1)
event = words[0]
attrs = parse(words[1])
if event == 'MapAttempt':
if attrs.has_key("START_TIME"):
mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
elif attrs.has_key("FINISH_TIME"):
mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
elif event == 'ReduceAttempt':
if attrs.has_key("START_TIME"):
reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
elif attrs.has_key("FINISH_TIME"):
reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
elif event == 'Task':
if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
for n,v in re.findall(counterPat, attrs["COUNTERS"]):
if n == "File Systems.HDFS bytes written":
reduceBytes[attrs["TASKID"]] = int(v)
runningMaps = {}
shufflingReduces = {}
sortingReduces = {}
runningReduces = {}
startTime = min(reduce(min, mapStartTime.values()),
reduce(min, reduceStartTime.values()))
endTime = max(reduce(max, mapEndTime.values()),
reduce(max, reduceEndTime.values()))
reduces = reduceBytes.keys()
reduces.sort()
print "Name reduce-output-bytes shuffle-finish reduce-finish"
for r in reduces:
print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
print reduceEndTime[r] - startTime
print
for t in range(startTime, endTime):
runningMaps[t] = 0
shufflingReduces[t] = 0
sortingReduces[t] = 0
runningReduces[t] = 0
for map in mapStartTime.keys():
for t in range(mapStartTime[map], mapEndTime[map]):
runningMaps[t] += 1
for reduce in reduceStartTime.keys():
for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
shufflingReduces[t] += 1
for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
sortingReduces[t] += 1
for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
runningReduces[t] += 1
print "time maps shuffle merge reduce"
for t in range(startTime, endTime):
print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t],
print runningReduces[t]