private synchronized boolean heartbeat()

in tez-dag/src/main/java/org/apache/tez/runtime/task/TaskReporter.java [207:277]


    private synchronized boolean heartbeat(Collection<TezEvent> eventsArg) throws IOException,
        TezException {

      if (eventsArg != null) {
        eventsToSend.addAll(eventsArg);
      }

      TezEvent updateEvent = null;
      List<TezEvent> events = new ArrayList<TezEvent>();
      eventsToSend.drainTo(events);

      if (!task.isTaskDone() && !task.hadFatalError()) {
        TezCounters counters = null;
        /**
         * Increasing the heartbeat interval can delay the delivery of events. Sending just updated
         * records would save CPU in DAG AM, but certain counters are updated very frequently. Until
         * real time decisions are made based on these counters, it can be sent once per second.
         */
        // Not completely accurate, since OOB heartbeats could go out.
        if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
          counters = task.getCounters();
          prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
        }
        updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
            updateEventMetadata);
        events.add(updateEvent);
      }

      long requestId = requestCounter.incrementAndGet();
      TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
          task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
      if (LOG.isDebugEnabled()) {
        Log.debug("Sending heartbeat to AM, request=" + request);
      }

      maybeLogCounters();

      TezHeartbeatResponse response = umbilical.heartbeat(request);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Received heartbeat response from AM, response=" + response);
      }

      if (response.shouldDie()) {
        LOG.info("Received should die response from AM");
        return false;
      }
      if (response.getLastRequestId() != requestId) {
        throw new TezException("AM and Task out of sync" + ", responseReqId="
            + response.getLastRequestId() + ", expectedReqId=" + requestId);
      }

      // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
      // are running using the same umbilical.
      if (task.isTaskDone() || task.hadFatalError()) {
        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
          LOG.warn("Current task already complete, Ignoring all event in"
              + " heartbeat response, eventCount=" + response.getEvents().size());
        }
      } else {
        if (response.getEvents() != null && !response.getEvents().isEmpty()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
                + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
          }
          // This should ideally happen in a separate thread
          task.handleEvents(response.getEvents());
        }
      }
      return true;

    }