CSC352 ApacheHadoopJobHistorySummary.py

From dftwiki3
Revision as of 14:03, 4 April 2010 by Thiebaut (talk | contribs)
Jump to: navigation, search

--D. Thiebaut 17:58, 4 April 2010 (UTC)


Information

From Owen O'Malley (http://www.mail-archive.com/common-user@hadoop.apache.org/msg00633.html)

Re: generate task timeline figures like "Hadoop Sorts a Petabyte..." blog
Owen O'Malley
Wed, 22 Jul 2009 09:26:13 -0700

The script is at:

http://people.apache.org/~omalley/tera-2009/job_history_summary.py

The input data is the job logs from the run:

http://people.apache.org/~omalley/tera-2009/1t/job_200904102259_0008_arunc_TeraSort.log.gz

is the 1tb run's log file. Uncompress it and feed it as standard input to the script. It will generate:

http://people.apache.org/~omalley/tera-2009/1t/summary.lst

and the bottom of that fed into your favorite spreadsheet will generate the pretty graphs.

-- Owen




job_history_summary.py

#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import re
import sys

pat = re.compile('(?P<name>[^=]+)="(?P<value>[^"]*)" *')
groupPat = re.compile(r'{\((?P<key>[^)]+)\)\((?P<name>[^)]+)\)(?P<counters>[^}]+)}')
counterPat = re.compile(r'\[\((?P<key>[^)]+)\)\((?P<name>[^)]+)\)\((?P<value>[^)]+)\)\]')

def parseCounters(str):
  result = {}
  for k,n,c in re.findall(groupPat, str):
    group = {}
    result[n] = group
    for sk, sn, sv in re.findall(counterPat, c):
      group[sn] = int(sv)
  return result

def parse(tail):
  result = {}
  for n,v in re.findall(pat, tail):
    result[n] = v
  return result

mapStartTime = {}
mapEndTime = {}
reduceStartTime = {}
reduceShuffleTime = {}
reduceSortTime = {}
reduceEndTime = {}
reduceBytes = {}
remainder = ""
finalAttempt = {}
wastedAttempts = []
subitTime = None
finishTime = None
scale = 1000

for line in sys.stdin:
  if len(line) < 3 or line[-3:] != " .\n":
    remainder += line
    continue
  line = remainder + line
  remainder = ""
  words = line.split(" ",1)
  event = words[0]
  attrs = parse(words[1])
  if event == 'Job':
    if attrs.has_key("SUBMIT_TIME"):
      submitTime = int(attrs["SUBMIT_TIME"]) / scale
    elif attrs.has_key("FINISH_TIME"):
      finishTime = int(attrs["FINISH_TIME"]) / scale
  elif event == 'MapAttempt':
    if attrs.has_key("START_TIME"):
      time = int(attrs["START_TIME"]) / scale
      if time != 0:
        mapStartTime[attrs["TASK_ATTEMPT_ID"]] = time
    elif attrs.has_key("FINISH_TIME"):
      mapEndTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["FINISH_TIME"])/scale
      if attrs.get("TASK_STATUS", "") == "SUCCESS":
        task = attrs["TASKID"]
        if finalAttempt.has_key(task):
          wastedAttempts.append(finalAttempt[task])
        finalAttempt[task] = attrs["TASK_ATTEMPT_ID"]
      else:
        wastedAttempts.append(attrs["TASK_ATTEMPT_ID"])
  elif event == 'ReduceAttempt':
    if attrs.has_key("START_TIME"):
      time = int(attrs["START_TIME"]) / scale
      if time != 0:
        reduceStartTime[attrs["TASK_ATTEMPT_ID"]] = time
    elif attrs.has_key("FINISH_TIME"):
      task = attrs["TASKID"]
      if attrs.get("TASK_STATUS", "") == "SUCCESS":
        if finalAttempt.has_key(task):
          wastedAttempts.append(finalAttempt[task])
        finalAttempt[task] = attrs["TASK_ATTEMPT_ID"]
      else:
        wastedAttempts.append(attrs["TASK_ATTEMPT_ID"])
      reduceEndTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["FINISH_TIME"]) / scale
      if attrs.has_key("SHUFFLE_FINISHED"):
        reduceShuffleTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["SHUFFLE_FINISHED"]) / scale
      if attrs.has_key("SORT_FINISHED"):
        reduceSortTime[attrs["TASK_ATTEMPT_ID"]] = int(attrs["SORT_FINISHED"]) / scale
  elif event == 'Task':
    if attrs["TASK_TYPE"] == "REDUCE" and attrs.has_key("COUNTERS"):
      counters = parseCounters(attrs["COUNTERS"])
      reduceBytes[attrs["TASKID"]] = int(counters.get('FileSystemCounters',{}).get('HDFS_BYTES_WRITTEN',0))

reduces = reduceBytes.keys()
reduces.sort()

print "Name reduce-output-bytes shuffle-finish reduce-finish"
for r in reduces:
  attempt = finalAttempt[r]
  print r, reduceBytes[r], reduceShuffleTime[attempt] - submitTime,
  print reduceEndTime[attempt] - submitTime

print

runningMaps = []
shufflingReduces = []
sortingReduces = []
runningReduces = []

waste = []
final = {}
for t in finalAttempt.values():
  final[t] = None

for t in range(submitTime, finishTime):
  runningMaps.append(0)
  shufflingReduces.append(0)
  sortingReduces.append(0)
  runningReduces.append(0)
  waste.append(0)

for map in mapEndTime.keys():
  isFinal = final.has_key(map)
  if mapStartTime.has_key(map):
    for t in range(mapStartTime[map]-submitTime, mapEndTime[map]-submitTime):
      if final:
        runningMaps[t] += 1
      else:
        waste[t] += 1

for reduce in reduceEndTime.keys():
  if reduceStartTime.has_key(reduce):
    if final.has_key(reduce):
      for t in range(reduceStartTime[reduce]-submitTime, reduceShuffleTime[reduce]-submitTime):
        shufflingReduces[t] += 1
      for t in range(reduceShuffleTime[reduce]-submitTime, reduceSortTime[reduce]-submitTime):
        sortingReduces[t] += 1
      for t in range(reduceSortTime[reduce]-submitTime, reduceEndTime[reduce]-submitTime):
        runningReduces[t] += 1
    else:
      for t in range(reduceStartTime[reduce]-submitTime, reduceEndTime[reduce]-submitTime):
        waste[t] += 1

print "time maps shuffle merge reduce waste"
for t in range(len(runningMaps)):
  print t, runningMaps[t], shufflingReduces[t], sortingReduces[t], runningReduces[t], waste[t]