in tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java [218:364]
public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
.getContainerIdentifier());
LOG.debug("Received heartbeat from container, request={}", request);
if (!registeredContainers.containsKey(containerId)) {
LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
", asking it to die");
return RESPONSE_SHOULD_DIE;
}
// A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
// meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
// the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
// know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
// So - avoiding synchronization.
pingContainerHeartbeatHandler(containerId);
TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
if (taskAttemptID != null) {
ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
// This can happen when a task heartbeats. Meanwhile the container is unregistered.
// The information will eventually make it through to the plugin via a corresponding unregister.
// There's a race in that case between the unregister making it through, and this method returning.
// TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
// so that the plugin can handle the scenario. Alternately augment the response with error codes.
// Error codes would be better than exceptions.
LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
return RESPONSE_SHOULD_DIE;
}
List<TezEvent> inEvents = request.getEvents();
if (LOG.isDebugEnabled()) {
LOG.debug("Ping from " + taskAttemptID.toString() +
" events: " + (inEvents != null ? inEvents.size() : -1));
}
long currTime = context.getClock().getTime();
// taFinishedEvents - means the TaskAttemptFinishedEvent
// taGeneratedEvents - for recovery, means the events generated by this task attempt and is needed by its downstream vertices
// eventsForVertex - including all the taGeneratedEvents and other events such as INPUT_READ_ERROR_EVENT/INPUT_FAILED_EVENT
// taGeneratedEvents is routed both to TaskAttempt & Vertex. Route to Vertex is for performance consideration
// taFinishedEvents must be routed before taGeneratedEvents
List<TezEvent> taFinishedEvents = new ArrayList<TezEvent>();
List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
List<TezEvent> eventsForVertex = new ArrayList<TezEvent>();
TaskAttemptEventStatusUpdate taskAttemptEvent = null;
boolean readErrorReported = false;
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
// for now, set the event time on the AM when it is received.
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
// send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed
taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
(TaskStatusUpdateEvent) tezEvent.getEvent());
} else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT
|| eventType == EventType.TASK_ATTEMPT_FAILED_EVENT
|| eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) {
taFinishedEvents.add(tezEvent);
} else {
if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
readErrorReported = true;
}
if (eventType == EventType.DATA_MOVEMENT_EVENT
|| eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT
|| eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT
|| eventType == EventType.VERTEX_MANAGER_EVENT) {
taGeneratedEvents.add(tezEvent);
}
eventsForVertex.add(tezEvent);
}
}
if (taskAttemptEvent != null) {
taskAttemptEvent.setReadErrorReported(readErrorReported);
sendEvent(taskAttemptEvent);
}
// route taGeneratedEvents to TaskAttempt
if (!taGeneratedEvents.isEmpty()) {
sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
}
// route events to TaskAttempt
Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent");
for (TezEvent e : taFinishedEvents) {
EventMetaData sourceMeta = e.getSourceInfo();
switch (e.getEventType()) {
case TASK_ATTEMPT_FAILED_EVENT:
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptTerminationCause errCause = null;
switch (sourceMeta.getEventGenerator()) {
case INPUT:
errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
break;
case PROCESSOR:
errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
break;
case OUTPUT:
errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
break;
case SYSTEM:
errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
break;
default:
throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
sourceMeta.getEventGenerator());
}
if (e.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) {
TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) e.getEvent();
sendEvent(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED, taskFailedEvent.getTaskFailureType(),
"Error: " + taskFailedEvent.getDiagnostics(),
errCause));
} else { // Killed
TaskAttemptKilledEvent taskKilledEvent = (TaskAttemptKilledEvent) e.getEvent();
sendEvent(
new TaskAttemptEventAttemptKilled(sourceMeta.getTaskAttemptID(),
"Error: " + taskKilledEvent.getDiagnostics(), errCause));
}
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
sendEvent(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ e.getEventType());
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
}