in hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java [746:956]
private void processJobLine(ParsedLine line) throws JsonProcessingException,
IOException {
try {
if (version == 0 || version == 1) {
// determine the job type if this is the declaration line
String jobID = line.get("JOBID");
String user = line.get("USER");
String jobPriority = line.get("JOB_PRIORITY");
String submitTime = line.get("SUBMIT_TIME");
String jobName = line.get("JOBNAME");
String launchTime = line.get("LAUNCH_TIME");
String finishTime = line.get("FINISH_TIME");
String status = line.get("JOB_STATUS");
String totalMaps = line.get("TOTAL_MAPS");
String totalReduces = line.get("TOTAL_REDUCES");
/*
* If the job appears new [the ID is different from the most recent one,
* if any] we make a new LoggedJob.
*/
if (jobID != null
&& jobTraceGen != null
&& (jobBeingTraced == null
|| !jobID.equals(jobBeingTraced.getJobID().toString()))) {
// push out the old job if there is one, even though it did't get
// mated
// with a conf.
finalizeJob();
jobBeingTraced = new LoggedJob(jobID);
tasksInCurrentJob = new HashMap<String, LoggedTask>();
attemptsInCurrentJob = new HashMap<String, LoggedTaskAttempt>();
// initialize all the per-job statistics gathering places
successfulMapAttemptTimes =
new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < successfulMapAttemptTimes.length; ++i) {
successfulMapAttemptTimes[i] = new Histogram();
}
successfulReduceAttemptTimes = new Histogram();
failedMapAttemptTimes =
new Histogram[ParsedHost.numberOfDistances() + 1];
for (int i = 0; i < failedMapAttemptTimes.length; ++i) {
failedMapAttemptTimes[i] = new Histogram();
}
failedReduceAttemptTimes = new Histogram();
successfulNthMapperAttempts = new Histogram();
successfulNthReducerAttempts = new Histogram();
mapperLocality = new Histogram();
}
// here we fill in all the stuff the trace might need
if (jobBeingTraced != null) {
if (user != null) {
jobBeingTraced.setUser(user);
}
if (jobPriority != null) {
jobBeingTraced.setPriority(LoggedJob.JobPriority
.valueOf(jobPriority));
}
if (totalMaps != null) {
jobBeingTraced.setTotalMaps(Integer.parseInt(totalMaps));
}
if (totalReduces != null) {
jobBeingTraced.setTotalReduces(Integer.parseInt(totalReduces));
}
if (submitTime != null) {
jobBeingTraced.setSubmitTime(Long.parseLong(submitTime));
}
if (launchTime != null) {
jobBeingTraced.setLaunchTime(Long.parseLong(launchTime));
}
if (finishTime != null) {
jobBeingTraced.setFinishTime(Long.parseLong(finishTime));
if (status != null) {
jobBeingTraced.setOutcome(Pre21JobHistoryConstants.Values
.valueOf(status));
}
maybeMateJobAndConf();
}
}
if (jobName != null) {
// we'll make it java unless the name parses out
Matcher m = streamingJobnamePattern.matcher(jobName);
thisJobType = LoggedJob.JobType.JAVA;
if (m.matches()) {
thisJobType = LoggedJob.JobType.STREAMING;
}
}
if (submitTime != null) {
submitTimeCurrentJob = Long.parseLong(submitTime);
currentJobID = jobID;
taskAttemptStartTimes = new HashMap<String, Long>();
taskReduceAttemptShuffleEndTimes = new HashMap<String, Long>();
taskReduceAttemptSortEndTimes = new HashMap<String, Long>();
taskMapAttemptFinishTimes = new HashMap<String, Long>();
taskReduceAttemptFinishTimes = new HashMap<String, Long>();
launchTimeCurrentJob = 0L;
} else if (launchTime != null && jobID != null
&& currentJobID.equals(jobID)) {
launchTimeCurrentJob = Long.parseLong(launchTime);
} else if (finishTime != null && jobID != null
&& currentJobID.equals(jobID)) {
long endTime = Long.parseLong(finishTime);
if (launchTimeCurrentJob != 0) {
String jobResultText = line.get("JOB_STATUS");
JobOutcome thisOutcome =
((jobResultText != null && "SUCCESS".equals(jobResultText))
? JobOutcome.SUCCESS : JobOutcome.FAILURE);
if (submitTimeCurrentJob != 0L) {
canonicalDistributionsEnter(delayTimeDists, thisOutcome,
thisJobType, launchTimeCurrentJob - submitTimeCurrentJob);
}
if (launchTimeCurrentJob != 0L) {
canonicalDistributionsEnter(runTimeDists, thisOutcome,
thisJobType, endTime - launchTimeCurrentJob);
}
// Now we process the hash tables with successful task attempts
Histogram currentJobMapTimes = new Histogram();
Histogram currentJobShuffleTimes = new Histogram();
Histogram currentJobSortTimes = new Histogram();
Histogram currentJobReduceTimes = new Histogram();
Iterator<Map.Entry<String, Long>> taskIter =
taskAttemptStartTimes.entrySet().iterator();
while (taskIter.hasNext()) {
Map.Entry<String, Long> entry = taskIter.next();
long startTime = entry.getValue();
// Map processing
Long mapEndTime = taskMapAttemptFinishTimes.get(entry.getKey());
if (mapEndTime != null) {
currentJobMapTimes.enter(mapEndTime - startTime);
canonicalDistributionsEnter(mapTimeDists, thisOutcome,
thisJobType, mapEndTime - startTime);
}
// Reduce processing
Long shuffleEnd =
taskReduceAttemptShuffleEndTimes.get(entry.getKey());
Long sortEnd = taskReduceAttemptSortEndTimes.get(entry.getKey());
Long reduceEnd = taskReduceAttemptFinishTimes.get(entry.getKey());
if (shuffleEnd != null && sortEnd != null && reduceEnd != null) {
currentJobShuffleTimes.enter(shuffleEnd - startTime);
currentJobSortTimes.enter(sortEnd - shuffleEnd);
currentJobReduceTimes.enter(reduceEnd - sortEnd);
canonicalDistributionsEnter(shuffleTimeDists, thisOutcome,
thisJobType, shuffleEnd - startTime);
canonicalDistributionsEnter(sortTimeDists, thisOutcome,
thisJobType, sortEnd - shuffleEnd);
canonicalDistributionsEnter(reduceTimeDists, thisOutcome,
thisJobType, reduceEnd - sortEnd);
}
}
// Here we save out the task information
incorporateSpread(currentJobMapTimes, mapTimeSpreadDists,
thisOutcome, thisJobType);
incorporateSpread(currentJobShuffleTimes, shuffleTimeSpreadDists,
thisOutcome, thisJobType);
incorporateSpread(currentJobSortTimes, sortTimeSpreadDists,
thisOutcome, thisJobType);
incorporateSpread(currentJobReduceTimes, reduceTimeSpreadDists,
thisOutcome, thisJobType);
}
}
}
} catch (NumberFormatException e) {
LOG.warn(
"HadoopLogsAnalyzer.processJobLine: bad numerical format, at line "
+ lineNumber + ".", e);
}
}