in tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java [207:277]
private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
TezException {
if (eventsArg != null) {
eventsToSend.addAll(eventsArg);
}
TezEvent updateEvent = null;
List<TezEvent> events = new ArrayList<TezEvent>();
eventsToSend.drainTo(events);
if (!task.isTaskDone() && !task.hadFatalError()) {
TezCounters counters = null;
/**
* Increasing the heartbeat interval can delay the delivery of events. Sending just updated
* records would save CPU in DAG AM, but certain counters are updated very frequently. Until
* real time decisions are made based on these counters, it can be sent once per second.
*/
// Not completely accurate, since OOB heartbeats could go out.
if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
counters = task.getCounters();
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
}
updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
updateEventMetadata);
events.add(updateEvent);
}
long requestId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
if (LOG.isDebugEnabled()) {
Log.debug("Sending heartbeat to AM, request=" + request);
}
maybeLogCounters();
TezHeartbeatResponse response = umbilical.heartbeat(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat response from AM, response=" + response);
}
if (response.shouldDie()) {
LOG.info("Received should die response from AM");
return false;
}
if (response.getLastRequestId() != requestId) {
throw new TezException("AM and Task out of sync" + ", responseReqId="
+ response.getLastRequestId() + ", expectedReqId=" + requestId);
}
// The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
// are running using the same umbilical.
if (task.isTaskDone() || task.hadFatalError()) {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
LOG.warn("Current task already complete, Ignoring all event in"
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
}
// This should ideally happen in a separate thread
task.handleEvents(response.getEvents());
}
}
return true;
}