public TaskHeartbeatResponse heartbeat()

in tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java [218:364]


  public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
      throws IOException, TezException {
    ContainerId containerId = ConverterUtils.toContainerId(request
        .getContainerIdentifier());
    LOG.debug("Received heartbeat from container, request={}", request);

    if (!registeredContainers.containsKey(containerId)) {
      LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
          ", asking it to die");
      return RESPONSE_SHOULD_DIE;
    }

    // A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
    // meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
    // the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
    // know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
    // So - avoiding synchronization.

    pingContainerHeartbeatHandler(containerId);
    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
    TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
    if (taskAttemptID != null) {
      ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
      if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
        // This can happen when a task heartbeats. Meanwhile the container is unregistered.
        // The information will eventually make it through to the plugin via a corresponding unregister.
        // There's a race in that case between the unregister making it through, and this method returning.
        // TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
        // so that the plugin can handle the scenario. Alternately augment the response with error codes.
        // Error codes would be better than exceptions.
        LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
        return RESPONSE_SHOULD_DIE;
      }

      List<TezEvent> inEvents = request.getEvents();
      if (LOG.isDebugEnabled()) {
        LOG.debug("Ping from " + taskAttemptID.toString() +
            " events: " + (inEvents != null ? inEvents.size() : -1));
      }

      long currTime = context.getClock().getTime();
      // taFinishedEvents - means the TaskAttemptFinishedEvent
      // taGeneratedEvents - for recovery, means the events generated by this task attempt and is needed by its downstream vertices
      // eventsForVertex - including all the taGeneratedEvents and other events such as INPUT_READ_ERROR_EVENT/INPUT_FAILED_EVENT
      // taGeneratedEvents is routed both to TaskAttempt & Vertex. Route to Vertex is for performance consideration
      // taFinishedEvents must be routed before taGeneratedEvents
      List<TezEvent> taFinishedEvents = new ArrayList<TezEvent>();
      List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
      List<TezEvent> eventsForVertex = new ArrayList<TezEvent>();
      TaskAttemptEventStatusUpdate taskAttemptEvent = null;
      boolean readErrorReported = false;
      for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
        // for now, set the event time on the AM when it is received.
        // this avoids any time disparity between machines.
        tezEvent.setEventReceivedTime(currTime);
        final EventType eventType = tezEvent.getEventType();
        if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
          // send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed
          taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
              (TaskStatusUpdateEvent) tezEvent.getEvent());
        } else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT
           || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT
           || eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) {
          taFinishedEvents.add(tezEvent);
        } else {
          if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
            readErrorReported = true;
          }
          if (eventType == EventType.DATA_MOVEMENT_EVENT
            || eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT
            || eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT
            || eventType == EventType.VERTEX_MANAGER_EVENT) {
            taGeneratedEvents.add(tezEvent);
          }
          eventsForVertex.add(tezEvent);
        }
      }
      if (taskAttemptEvent != null) {
        taskAttemptEvent.setReadErrorReported(readErrorReported);
        sendEvent(taskAttemptEvent);
      }
      // route taGeneratedEvents to TaskAttempt
      if (!taGeneratedEvents.isEmpty()) {
        sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
      }
      // route events to TaskAttempt
      Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent");
      for (TezEvent e : taFinishedEvents) {
        EventMetaData sourceMeta = e.getSourceInfo();
        switch (e.getEventType()) {
        case TASK_ATTEMPT_FAILED_EVENT:
        case TASK_ATTEMPT_KILLED_EVENT:
          TaskAttemptTerminationCause errCause = null;
          switch (sourceMeta.getEventGenerator()) {
          case INPUT:
            errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
            break;
          case PROCESSOR:
            errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
            break;
          case OUTPUT:
            errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
            break;
          case SYSTEM:
            errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
            break;
          default:
            throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
                sourceMeta.getEventGenerator());
          }
          if (e.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) {
            TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) e.getEvent();
            sendEvent(
                new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                    TaskAttemptEventType.TA_FAILED, taskFailedEvent.getTaskFailureType(),
                    "Error: " + taskFailedEvent.getDiagnostics(),
                    errCause));
          } else { // Killed
            TaskAttemptKilledEvent taskKilledEvent = (TaskAttemptKilledEvent) e.getEvent();
            sendEvent(
                new TaskAttemptEventAttemptKilled(sourceMeta.getTaskAttemptID(),
                    "Error: " + taskKilledEvent.getDiagnostics(), errCause));
          }
          break;
        case TASK_ATTEMPT_COMPLETED_EVENT:
          sendEvent(
              new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
          break;
        default:
          throw new TezUncheckedException("Unhandled tez event type: "
             + e.getEventType());
        }
      }
      if (!eventsForVertex.isEmpty()) {
        TezVertexID vertexId = taskAttemptID.getVertexID();
        sendEvent(
            new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
      }
      taskHeartbeatHandler.pinged(taskAttemptID);
      eventInfo = context
          .getCurrentDAG()
          .getVertex(taskAttemptID.getVertexID())
          .getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
              request.getMaxEvents());
    }
    return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
  }