private void handleRoutedTezEvents()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [4123:4280]


  private void handleRoutedTezEvents(List<TezEvent> tezEvents, boolean isPendingEvents) throws AMUserCodeException {
    for(TezEvent tezEvent : tezEvents) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Vertex: " + getLogIdentifier() + " routing event: "
            + tezEvent.getEventType());
      }
      EventMetaData sourceMeta = tezEvent.getSourceInfo();
      switch(tezEvent.getEventType()) {
      case CUSTOM_PROCESSOR_EVENT:
        {
          // set version as app attempt id
          ((CustomProcessorEvent) tezEvent.getEvent()).setVersion(
            appContext.getApplicationAttemptId().getAttemptId());
          // route event to task
          EventMetaData destinationMeta = tezEvent.getDestinationInfo();
          Task targetTask = getTask(destinationMeta.getTaskAttemptID().getTaskID());
          targetTask.registerTezEvent(tezEvent);
        }
        break;
      case INPUT_FAILED_EVENT:
      case DATA_MOVEMENT_EVENT:
      case COMPOSITE_DATA_MOVEMENT_EVENT:
        {
          if (isEventFromVertex(this, sourceMeta)) {
            // event from this vertex. send to destination vertex
            TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
            if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
              ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
              ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
            } else {
              ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
            }
            Vertex destVertex = getDAG().getVertex(sourceMeta.getEdgeVertexName());
            Edge destEdge = targetVertices.get(destVertex);
            if (destEdge == null) {
              throw new TezUncheckedException("Bad destination vertex: " +
                  sourceMeta.getEdgeVertexName() + " for event vertex: " +
                  getLogIdentifier());
            }
            eventHandler.handle(new VertexEventRouteEvent(destVertex
                .getVertexId(), Collections.singletonList(tezEvent)));
          } else {
            if (tasksNotYetScheduled) {
              // this is only needed to support mixed mode routing. Else for
              // on demand routing events can be directly added to taskEvents
              // when legacy routing is removed then pending task events can be
              // removed.
              pendingTaskEvents.add(tezEvent);
            } else {
              // event not from this vertex. must have come from source vertex.
              int srcTaskIndex = sourceMeta.getTaskID().getId();
              Vertex edgeVertex = getDAG().getVertex(sourceMeta.getTaskVertexName());
              Edge srcEdge = sourceVertices.get(edgeVertex);
              if (srcEdge == null) {
                throw new TezUncheckedException("Bad source vertex: " +
                    sourceMeta.getTaskVertexName() + " for destination vertex: " +
                    getLogIdentifier());
              }
              if (srcEdge.hasOnDemandRouting()) {
                processOnDemandEvent(tezEvent, srcEdge, srcTaskIndex);
              } else {
                // send to tasks            
                srcEdge.sendTezEventToDestinationTasks(tezEvent);
              }
            }
          }
        }
        break;
      case ROOT_INPUT_DATA_INFORMATION_EVENT:
      {   
        checkEventSourceMetadata(this, sourceMeta);
        if (tasksNotYetScheduled) {
          // this is only needed to support mixed mode routing. Else for
          // on demand routing events can be directly added to taskEvents
          // when legacy routing is removed then pending task events can be
          // removed.
          pendingTaskEvents.add(tezEvent);          
        } else {
          InputDataInformationEvent riEvent = (InputDataInformationEvent) tezEvent.getEvent();
          Task targetTask = getTask(riEvent.getTargetIndex());
          targetTask.registerTezEvent(tezEvent);
        }
      }
        break;
      case VERTEX_MANAGER_EVENT:
      {
        // VM events on task success only can be changed as part of TEZ-1532
        VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
        Vertex target = getDAG().getVertex(vmEvent.getTargetVertexName());
        Preconditions.checkArgument(target != null,
            "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
        TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
        if (srcTaId.getVertexID().equals(vertexId)) {
          // this is the producer tasks' vertex
          vmEvent.setProducerAttemptIdentifier(
              getTaskAttemptIdentifier(dag.getName(), getName(), srcTaId));
        }
        if (target == this) {
          if (!vmIsInitialized.get()) {
            // The VM hasn't been setup yet, defer event consumption
            pendingVmEvents.add(vmEvent);
          } else {
            vertexManager.onVertexManagerEventReceived(vmEvent);
          }
        } else {
          checkEventSourceMetadata(this, sourceMeta);
          eventHandler.handle(new VertexEventRouteEvent(target
              .getVertexId(), Collections.singletonList(tezEvent)));
        }
      }
        break;
      case ROOT_INPUT_INITIALIZER_EVENT:
      {
        InputInitializerEvent riEvent = (InputInitializerEvent) tezEvent.getEvent();
        Vertex target = getDAG().getVertex(riEvent.getTargetVertexName());
        Preconditions.checkArgument(target != null,
            "Event sent to unknown vertex: " + riEvent.getTargetVertexName());
        riEvent.setSourceVertexName(tezEvent.getSourceInfo().getTaskVertexName());
        if (target == this) {
          if (rootInputDescriptors == null ||
              !rootInputDescriptors.containsKey(riEvent.getTargetInputName())) {
            throw new TezUncheckedException(
                "InputInitializerEvent targeted at unknown initializer on vertex " +
                    logIdentifier + ", Event=" + riEvent);
          }
          if (getState() == VertexState.NEW) {
            pendingInitializerEvents.add(tezEvent);
          } else  if (getState() == VertexState.INITIALIZING) {
            rootInputInitializerManager.handleInitializerEvents(Collections.singletonList(tezEvent));
          } else {
            // Currently, INITED and subsequent states means Initializer complete / failure
            if (LOG.isDebugEnabled()) {
              LOG.debug("Dropping event" + tezEvent + " since state is not INITIALIZING in "
                  + getLogIdentifier() + ", state=" + getState());
            }
          }
        } else {
          checkEventSourceMetadata(this, sourceMeta);
          eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
              Collections.singletonList(tezEvent)));
        }
      }
        break;
      case INPUT_READ_ERROR_EVENT:
        {
          checkEventSourceMetadata(this, sourceMeta);
          Edge srcEdge = sourceVertices.get(this.getDAG().getVertex(
              sourceMeta.getEdgeVertexName()));
          srcEdge.sendTezEventToSourceTasks(tezEvent);
        }
        break;
      default:
        throw new TezUncheckedException("Unhandled tez event type: "
            + tezEvent.getEventType());
      }
    }
  }