in hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java [1275:1416]
private void processEventForNewTimelineService(HistoryEvent event,
JobId jobId, long timestamp) {
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity tEntity =
null;
String taskId = null;
String taskAttemptId = null;
boolean setCreatedTime = false;
long taskIdPrefix = 0;
long taskAttemptIdPrefix = 0;
switch (event.getEventType()) {
// Handle job events
case JOB_SUBMITTED:
setCreatedTime = true;
break;
case JOB_STATUS_CHANGED:
case JOB_INFO_CHANGED:
case JOB_INITED:
case JOB_PRIORITY_CHANGED:
case JOB_QUEUE_CHANGED:
case JOB_FAILED:
case JOB_KILLED:
case JOB_ERROR:
case JOB_FINISHED:
case AM_STARTED:
case NORMALIZED_RESOURCE:
break;
// Handle task events
case TASK_STARTED:
setCreatedTime = true;
taskId = ((TaskStartedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskStartedEvent)event).getStartTime());
break;
case TASK_FAILED:
taskId = ((TaskFailedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFailedEvent)event).getStartTime());
break;
case TASK_UPDATED:
taskId = ((TaskUpdatedEvent)event).getTaskId().toString();
break;
case TASK_FINISHED:
taskId = ((TaskFinishedEvent)event).getTaskId().toString();
taskIdPrefix = TimelineServiceHelper.
invertLong(((TaskFinishedEvent)event).getStartTime());
break;
case MAP_ATTEMPT_STARTED:
case REDUCE_ATTEMPT_STARTED:
setCreatedTime = true;
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((TaskAttemptStartedEvent)event).getStartTime());
break;
case CLEANUP_ATTEMPT_STARTED:
case SETUP_ATTEMPT_STARTED:
taskId = ((TaskAttemptStartedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptStartedEvent)event).
getTaskAttemptId().toString();
break;
case MAP_ATTEMPT_FAILED:
case CLEANUP_ATTEMPT_FAILED:
case REDUCE_ATTEMPT_FAILED:
case SETUP_ATTEMPT_FAILED:
case MAP_ATTEMPT_KILLED:
case CLEANUP_ATTEMPT_KILLED:
case REDUCE_ATTEMPT_KILLED:
case SETUP_ATTEMPT_KILLED:
taskId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskId().toString();
taskAttemptId = ((TaskAttemptUnsuccessfulCompletionEvent)event).
getTaskAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.invertLong(
((TaskAttemptUnsuccessfulCompletionEvent)event).getStartTime());
break;
case MAP_ATTEMPT_FINISHED:
taskId = ((MapAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((MapAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((MapAttemptFinishedEvent)event).getStartTime());
break;
case REDUCE_ATTEMPT_FINISHED:
taskId = ((ReduceAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((ReduceAttemptFinishedEvent)event).
getAttemptId().toString();
taskAttemptIdPrefix = TimelineServiceHelper.
invertLong(((ReduceAttemptFinishedEvent)event).getStartTime());
break;
case SETUP_ATTEMPT_FINISHED:
case CLEANUP_ATTEMPT_FINISHED:
taskId = ((TaskAttemptFinishedEvent)event).getTaskId().toString();
taskAttemptId = ((TaskAttemptFinishedEvent)event).
getAttemptId().toString();
break;
default:
LOG.warn("EventType: " + event.getEventType() + " cannot be recognized" +
" and handled by timeline service.");
return;
}
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
appEntityWithJobMetrics = null;
if (taskId == null) {
// JobEntity
tEntity = createJobEntity(event, timestamp, jobId,
MAPREDUCE_JOB_ENTITY_TYPE, setCreatedTime);
if (event.getEventType() == EventType.JOB_FINISHED
&& event.getTimelineMetrics() != null) {
appEntityWithJobMetrics = createAppEntityWithJobMetrics(event, jobId);
}
} else {
if (taskAttemptId == null) {
// TaskEntity
tEntity = createTaskEntity(event, timestamp, taskId,
MAPREDUCE_TASK_ENTITY_TYPE, MAPREDUCE_JOB_ENTITY_TYPE,
jobId, setCreatedTime, taskIdPrefix);
} else {
// TaskAttemptEntity
tEntity = createTaskAttemptEntity(event, timestamp, taskAttemptId,
MAPREDUCE_TASK_ATTEMPT_ENTITY_TYPE, MAPREDUCE_TASK_ENTITY_TYPE,
taskId, setCreatedTime, taskAttemptIdPrefix);
}
}
try {
if (appEntityWithJobMetrics == null) {
timelineV2Client.putEntitiesAsync(tEntity);
} else {
timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
}
} catch (IOException | YarnException e) {
LOG.error("Failed to process Event " + event.getEventType()
+ " for the job : " + jobId, e);
return;
}
if (event.getEventType() == EventType.JOB_SUBMITTED) {
// Publish configs after main job submitted event has been posted.
publishConfigsOnJobSubmittedEvent((JobSubmittedEvent)event, jobId);
}
}