public TaskAttemptEventInfo getTaskAttemptTezEvents()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [4001:4121]


  public TaskAttemptEventInfo getTaskAttemptTezEvents(TezTaskAttemptID attemptID,
      int fromEventId, int preRoutedFromEventId, int maxEvents) {
    Task task = getTask(attemptID.getTaskID());
    ArrayList<TezEvent> events = task.getTaskAttemptTezEvents(
        attemptID, preRoutedFromEventId, maxEvents);
    int nextPreRoutedFromEventId = preRoutedFromEventId + events.size();
    int nextFromEventId = fromEventId;
    onDemandRouteEventsReadLock.lock();
    try {
      int currEventCount = onDemandRouteEvents.size();
      try {
        if (currEventCount > fromEventId) {
          if (events != TaskImpl.EMPTY_TASK_ATTEMPT_TEZ_EVENTS) {
            events.ensureCapacity(maxEvents);
          } else {
            events = Lists.newArrayListWithCapacity(maxEvents);
          }
          int numPreRoutedEvents = events.size();
          int taskIndex = attemptID.getTaskID().getId();
          Preconditions.checkState(taskIndex < tasks.size(), "Invalid task index for TA: " + attemptID
              + " vertex: " + getLogIdentifier());
          boolean isFirstEvent = true;
          boolean firstEventObsoleted = false;
          for (nextFromEventId = fromEventId; nextFromEventId < currEventCount; ++nextFromEventId) {
            boolean earlyExit = false;
            if (events.size() == maxEvents) {
              break;
            }
            EventInfo eventInfo = onDemandRouteEvents.get(nextFromEventId);
            if (eventInfo.isObsolete) {
              // ignore obsolete events
              firstEventObsoleted = true;
              continue;
            }
            TezEvent tezEvent = eventInfo.tezEvent;
            switch(tezEvent.getEventType()) {
            case INPUT_FAILED_EVENT:
            case DATA_MOVEMENT_EVENT:
            case COMPOSITE_DATA_MOVEMENT_EVENT:
              {
                int srcTaskIndex = eventInfo.eventTaskIndex;
                Edge srcEdge = eventInfo.eventEdge;
                PendingEventRouteMetadata pendingRoute = null;
                if (isFirstEvent) {
                  // the first event is the one that can have pending routes because its expanded
                  // events had not been completely sent in the last round.
                  isFirstEvent = false;
                  pendingRoute = srcEdge.removePendingEvents(attemptID);
                  if (pendingRoute != null) {
                    // the first event must match the pending route event
                    // the only reason it may not match is if in between rounds that event got
                    // obsoleted
                    if(tezEvent != pendingRoute.getTezEvent()) {
                      Preconditions.checkState(firstEventObsoleted);
                      // pending routes can be ignored for obsoleted events
                      pendingRoute = null;
                    }
                  }
                }
                if (!srcEdge.maybeAddTezEventForDestinationTask(tezEvent, attemptID, srcTaskIndex,
                    events, maxEvents, pendingRoute)) {
                  // not enough space left for this iteration events.
                  // Exit and start from here next time
                  earlyExit = true;
                }
              }
              break;
            case ROOT_INPUT_DATA_INFORMATION_EVENT:
              {
                InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
                if (riEvent.getTargetIndex() == taskIndex) {
                  events.add(tezEvent);
                }
              }
              break;
            default:
              throw new TezUncheckedException("Unexpected event type for task: "
                  + tezEvent.getEventType());
            }
            if (earlyExit) {
              break;
            }
          }
          int numEventsSent = events.size() - numPreRoutedEvents;
          if (numEventsSent > 0) {
            StringBuilder builder = new StringBuilder();
            builder.append("Sending ").append(attemptID).append(" ")
                .append(numEventsSent)
                .append(" events [").append(fromEventId).append(",").append(nextFromEventId)
                .append(") total ").append(currEventCount).append(" ")
                .append(getLogIdentifier());
            LOG.info(builder.toString());
          }
        }
      } catch (AMUserCodeException e) {
        String msg = "Exception in " + e.getSource() + ", vertex=" + getLogIdentifier();
        LOG.error(msg, e);
        eventHandler.handle(new VertexEventManagerUserCodeError(getVertexId(), e));
        nextFromEventId = fromEventId;
        events.clear();
      }
    } finally {
      onDemandRouteEventsReadLock.unlock();
    }
    if (!events.isEmpty()) {
      for (int i=(events.size() - 1); i>=0; --i) {
        TezEvent lastEvent = events.get(i);
              // record the last event sent by the AM to the task
        EventType lastEventType = lastEvent.getEventType();
        // if the following changes then critical path logic/recording may need revision
        if (lastEventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT ||
            lastEventType == EventType.COMPOSITE_ROUTED_DATA_MOVEMENT_EVENT ||
            lastEventType == EventType.DATA_MOVEMENT_EVENT ||
            lastEventType == EventType.ROOT_INPUT_DATA_INFORMATION_EVENT) {
          task.getAttempt(attemptID).setLastEventSent(lastEvent);
          break;
        }
      }
    }
    return new TaskAttemptEventInfo(nextFromEventId, events, nextPreRoutedFromEventId);
  }