Difference between revisions of "CSC352 ApacheHadoopJobHistorySummary.py"

From dftwiki3
Jump to: navigation, search
(job_history_summary.py)
 
Line 58: Line 58:
  
 
pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
 
pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
groupPat = re.compile(r'{\((?P<key>[^)]+)\)\((?P<name>[^)]+)\)(?P<counters>[^}]+)}')
+
counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')
counterPat = re.compile(r'\[\((?P<key>[^)]+)\)\((?P<name>[^)]+)\)\((?P<value>[^)]+)\)\]')
 
 
 
def parseCounters(str):
 
  result = {}
 
  for k,n,c in re.findall(groupPat, str):
 
    group = {}
 
    result[n] = group
 
    for sk, sn, sv in re.findall(counterPat, c):
 
      group[sn] = int(sv)
 
  return result
 
  
 
def parse(tail):
 
def parse(tail):
Line 83: Line 73:
 
reduceEndTime = {}
 
reduceEndTime = {}
 
reduceBytes = {}
 
reduceBytes = {}
remainder = ""
 
finalAttempt = {}
 
wastedAttempts = []
 
subitTime = None
 
finishTime = None
 
scale = 1000
 
  
 
for line in sys.stdin:
 
for line in sys.stdin:
  if len(line) < 3 or line[-3:] != " .\n":
 
    remainder += line
 
    continue
 
  line = remainder + line
 
  remainder = ""
 
 
   words = line.split(" ",1)
 
   words = line.split(" ",1)
 
   event = words[0]
 
   event = words[0]
 
   attrs = parse(words[1])
 
   attrs = parse(words[1])
   if event == 'Job':
+
   if event == 'MapAttempt':
    if attrs.has_key("SUBMIT_TIME"):
 
      submitTime = int(attrs["SUBMIT_TIME"]) / scale
 
    elif attrs.has_key("FINISH_TIME"):
 
      finishTime = int(attrs["FINISH_TIME"]) / scale
 
  elif event == 'MapAttempt':
 
 
     if attrs.has_key("START_TIME"):
 
     if attrs.has_key("START_TIME"):
       time = int(attrs["START_TIME"]) / scale
+
       mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
      if time != 0:
 
        mapStartTime[attrs["TASK_ATTEMPT_ID"]] = time
 
 
     elif attrs.has_key("FINISH_TIME"):
 
     elif attrs.has_key("FINISH_TIME"):
       mapEndTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["FINISH_TIME"])/scale
+
       mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
      if attrs.get("TASK_STATUS", "") == "SUCCESS":
 
        task = attrs["TASKID"]
 
        if finalAttempt.has_key(task):
 
          wastedAttempts.append(finalAttempt[task])
 
        finalAttempt[task] = attrs["TASK_ATTEMPT_ID"]
 
      else:
 
        wastedAttempts.append(attrs["TASK_ATTEMPT_ID"])
 
 
   elif event == 'ReduceAttempt':
 
   elif event == 'ReduceAttempt':
 
     if attrs.has_key("START_TIME"):
 
     if attrs.has_key("START_TIME"):
       time = int(attrs["START_TIME"]) / scale
+
       reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
      if time != 0:
 
        reduceStartTime[attrs["TASK_ATTEMPT_ID"]] = time
 
 
     elif attrs.has_key("FINISH_TIME"):
 
     elif attrs.has_key("FINISH_TIME"):
       task = attrs["TASKID"]
+
       reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
      if attrs.get("TASK_STATUS", "") == "SUCCESS":
+
       reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
        if finalAttempt.has_key(task):
+
       reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
          wastedAttempts.append(finalAttempt[task])
 
        finalAttempt[task] = attrs["TASK_ATTEMPT_ID"]
 
      else:
 
        wastedAttempts.append(attrs["TASK_ATTEMPT_ID"])
 
      reduceEndTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["FINISH_TIME"]) / scale
 
       if attrs.has_key("SHUFFLE_FINISHED"):
 
        reduceShuffleTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["SHUFFLE_FINISHED"]) / scale
 
       if attrs.has_key("SORT_FINISHED"):
 
        reduceSortTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["SORT_FINISHED"]) / scale
 
 
   elif event == 'Task':
 
   elif event == 'Task':
 
     if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
 
     if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
       counters = parseCounters(attrs["COUNTERS"])
+
       for n,v in re.findall(counterPat, attrs["COUNTERS"]):
      reduceBytes[attrs["TASKID"]] = int(counters.get('FileSystemCounters',{}).get('HDFS_BYTES_WRITTEN',0))
+
        if n == "File Systems.HDFS bytes written":
 +
          reduceBytes[attrs["TASKID"]] = int(v)
 +
 
 +
runningMaps = {}
 +
shufflingReduces = {}
 +
sortingReduces = {}
 +
runningReduces = {}
 +
startTime = min(reduce(min, mapStartTime.values()),
 +
                reduce(min, reduceStartTime.values()))
 +
endTime = max(reduce(max, mapEndTime.values()),
 +
              reduce(max, reduceEndTime.values()))
  
 
reduces = reduceBytes.keys()
 
reduces = reduceBytes.keys()
Line 146: Line 110:
 
print "Name reduce-output-bytes shuffle-finish reduce-finish"
 
print "Name reduce-output-bytes shuffle-finish reduce-finish"
 
for r in reduces:
 
for r in reduces:
  attempt = finalAttempt[r]
+
   print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
   print r, reduceBytes[r], reduceShuffleTime[attempt] - submitTime,
+
   print reduceEndTime[r] - startTime
   print reduceEndTime[attempt] - submitTime
 
  
 
print
 
print
  
runningMaps = []
+
for t in range(startTime, endTime):
shufflingReduces = []
+
   runningMaps[t] = 0
sortingReduces = []
+
   shufflingReduces[t] = 0
runningReduces = []
+
  sortingReduces[t] = 0
 
+
  runningReduces[t] = 0
waste = []
 
final = {}
 
for t in finalAttempt.values():
 
  final[t] = None
 
 
 
for t in range(submitTime, finishTime):
 
  runningMaps.append(0)
 
  shufflingReduces.append(0)
 
  sortingReduces.append(0)
 
  runningReduces.append(0)
 
  waste.append(0)
 
 
 
for map in mapEndTime.keys():
 
   isFinal = final.has_key(map)
 
  if mapStartTime.has_key(map):
 
    for t in range(mapStartTime[map]-submitTime, mapEndTime[map]-submitTime):
 
      if final:
 
        runningMaps[t] += 1
 
      else:
 
        waste[t] += 1
 
 
 
for reduce in reduceEndTime.keys():
 
   if reduceStartTime.has_key(reduce):
 
    if final.has_key(reduce):
 
      for t in range(reduceStartTime[reduce]-submitTime, reduceShuffleTime[reduce]-submitTime):
 
        shufflingReduces[t] += 1
 
      for t in range(reduceShuffleTime[reduce]-submitTime, reduceSortTime[reduce]-submitTime):
 
        sortingReduces[t] += 1
 
      for t in range(reduceSortTime[reduce]-submitTime, reduceEndTime[reduce]-submitTime):
 
        runningReduces[t] += 1
 
    else:
 
      for t in range(reduceStartTime[reduce]-submitTime, reduceEndTime[reduce]-submitTime):
 
        waste[t] += 1
 
  
print "time maps shuffle merge reduce waste"
+
for map in mapStartTime.keys():
for t in range(len(runningMaps)):
+
  for t in range(mapStartTime[map], mapEndTime[map]):
   print t, runningMaps[t], shufflingReduces[t], sortingReduces[t], runningReduces[t], waste[t]
+
    runningMaps[t] += 1
 +
for reduce in reduceStartTime.keys():
 +
   for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
 +
    shufflingReduces[t] += 1
 +
  for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
 +
    sortingReduces[t] += 1
 +
  for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
 +
    runningReduces[t] += 1
  
 +
print "time maps shuffle merge reduce"
 +
for t in range(startTime, endTime):
 +
  print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t],
 +
  print runningReduces[t]
  
 
</source>
 
</source>

Latest revision as of 14:56, 4 April 2010

--D. Thiebaut 17:58, 4 April 2010 (UTC)


Information

From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html)

Re: generate task timeline figures like "Hadoop Sorts a Petabyte..." blog
Owen O'Malley
Wed, 22 Jul 2009 09:26:13 -0700

The script is at:

http://people.apache.org/~omalley/tera-2009/job_history_summary.py

The input data is the job logs from the run:

http://people.apache.org/~omalley/tera-2009/1t/job_200904102259_0008_arunc_TeraSort.log.gz

is the 1tb run's log file. Uncompress it and feed it as standard input to the script. It will generate:

http://people.apache.org/~omalley/tera-2009/1t/summary.lst

and the bottom of that fed into your favorite spreadsheet will generate the pretty graphs.

-- Owen




job_history_summary.py

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import sys

pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
counterPat = re.compile('(?P<name>[^:]+):(?P<value>[^,]*),?')

def parse(tail):
  result = {}
  for n,v in re.findall(pat, tail):
    result[n] = v
  return result

mapStartTime = {}
mapEndTime = {}
reduceStartTime = {}
reduceShuffleTime = {}
reduceSortTime = {}
reduceEndTime = {}
reduceBytes = {}

for line in sys.stdin:
  words = line.split(" ",1)
  event = words[0]
  attrs = parse(words[1])
  if event == 'MapAttempt':
    if attrs.has_key("START_TIME"):
      mapStartTime[attrs["TASKID"]] = int(attrs["START_TIME"])/1000
    elif attrs.has_key("FINISH_TIME"):
      mapEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
  elif event == 'ReduceAttempt':
    if attrs.has_key("START_TIME"):
      reduceStartTime[attrs["TASKID"]] = int(attrs["START_TIME"]) / 1000
    elif attrs.has_key("FINISH_TIME"):
      reduceShuffleTime[attrs["TASKID"]] = int(attrs["SHUFFLE_FINISHED"])/1000
      reduceSortTime[attrs["TASKID"]] = int(attrs["SORT_FINISHED"])/1000
      reduceEndTime[attrs["TASKID"]] = int(attrs["FINISH_TIME"])/1000
  elif event == 'Task':
    if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
      for n,v in re.findall(counterPat, attrs["COUNTERS"]):
        if n == "File Systems.HDFS bytes written":
          reduceBytes[attrs["TASKID"]] = int(v)

runningMaps = {}
shufflingReduces = {}
sortingReduces = {}
runningReduces = {}
startTime = min(reduce(min, mapStartTime.values()),
                reduce(min, reduceStartTime.values()))
endTime = max(reduce(max, mapEndTime.values()),
              reduce(max, reduceEndTime.values()))

reduces = reduceBytes.keys()
reduces.sort()

print "Name reduce-output-bytes shuffle-finish reduce-finish"
for r in reduces:
  print r, reduceBytes[r], reduceShuffleTime[r] - startTime,
  print reduceEndTime[r] - startTime

print

for t in range(startTime, endTime):
  runningMaps[t] = 0
  shufflingReduces[t] = 0
  sortingReduces[t] = 0
  runningReduces[t] = 0

for map in mapStartTime.keys():
  for t in range(mapStartTime[map], mapEndTime[map]):
    runningMaps[t] += 1
for reduce in reduceStartTime.keys():
  for t in range(reduceStartTime[reduce], reduceShuffleTime[reduce]):
    shufflingReduces[t] += 1
  for t in range(reduceShuffleTime[reduce], reduceSortTime[reduce]):
    sortingReduces[t] += 1
  for t in range(reduceSortTime[reduce], reduceEndTime[reduce]):
    runningReduces[t] += 1

print "time maps shuffle merge reduce"
for t in range(startTime, endTime):
  print t - startTime, runningMaps[t], shufflingReduces[t], sortingReduces[t], 
  print runningReduces[t]