public void transition()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [3285:3446]


    public void transition(VertexImpl vertex, VertexEvent event) {
      VertexEventRouteEvent rEvent = (VertexEventRouteEvent) event;
      boolean recovered = rEvent.isRecovered();
      List<TezEvent> tezEvents = rEvent.getEvents();

      if (vertex.getAppContext().isRecoveryEnabled()
          && !recovered
          && !tezEvents.isEmpty()) {
        List<TezEvent> dataMovementEvents =
            Lists.newArrayList();
        for (TezEvent tezEvent : tezEvents) {
          if (!isEventFromVertex(vertex, tezEvent.getSourceInfo())) {
            continue;
          }
          if  (tezEvent.getEventType().equals(EventType.COMPOSITE_DATA_MOVEMENT_EVENT)
            || tezEvent.getEventType().equals(EventType.DATA_MOVEMENT_EVENT)
            || tezEvent.getEventType().equals(EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
            dataMovementEvents.add(tezEvent);
          }
        }
        if (!dataMovementEvents.isEmpty()) {
          VertexDataMovementEventsGeneratedEvent historyEvent =
              new VertexDataMovementEventsGeneratedEvent(vertex.vertexId,
                  dataMovementEvents);
          vertex.appContext.getHistoryHandler().handle(
              new DAGHistoryEvent(vertex.getDAGId(), historyEvent));
        }
      }
      for(TezEvent tezEvent : tezEvents) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Vertex: " + vertex.getName() + " routing event: "
              + tezEvent.getEventType()
              + " Recovered:" + recovered);
        }
        EventMetaData sourceMeta = tezEvent.getSourceInfo();
        switch(tezEvent.getEventType()) {
        case INPUT_FAILED_EVENT:
        case DATA_MOVEMENT_EVENT:
        case COMPOSITE_DATA_MOVEMENT_EVENT:
          {
            if (isEventFromVertex(vertex, sourceMeta)) {
              // event from this vertex. send to destination vertex
              TezTaskAttemptID srcTaId = sourceMeta.getTaskAttemptID();
              if (tezEvent.getEventType() == EventType.DATA_MOVEMENT_EVENT) {
                ((DataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
              } else if (tezEvent.getEventType() == EventType.COMPOSITE_DATA_MOVEMENT_EVENT) {
                ((CompositeDataMovementEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
              } else {
                ((InputFailedEvent) tezEvent.getEvent()).setVersion(srcTaId.getId());
              }
              Vertex destVertex = vertex.getDAG().getVertex(sourceMeta.getEdgeVertexName());
              Edge destEdge = vertex.targetVertices.get(destVertex);
              if (destEdge == null) {
                throw new TezUncheckedException("Bad destination vertex: " +
                    sourceMeta.getEdgeVertexName() + " for event vertex: " +
                    vertex.getVertexId());
              }
              vertex.eventHandler.handle(new VertexEventRouteEvent(destVertex
                  .getVertexId(), Collections.singletonList(tezEvent)));
            } else {
              // event not from this vertex. must have come from source vertex.
              // send to tasks
              if (vertex.tasksNotYetScheduled) {
                vertex.pendingTaskEvents.add(tezEvent);
              } else {
                Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
                    sourceMeta.getTaskVertexName()));
                if (srcEdge == null) {
                  throw new TezUncheckedException("Bad source vertex: " +
                      sourceMeta.getTaskVertexName() + " for destination vertex: " +
                      vertex.getVertexId());
                }
                srcEdge.sendTezEventToDestinationTasks(tezEvent);
              }
            }
          }
          break;
        case ROOT_INPUT_DATA_INFORMATION_EVENT:
          if (vertex.tasksNotYetScheduled) {
            vertex.pendingTaskEvents.add(tezEvent);
          } else {
            checkEventSourceMetadata(vertex, sourceMeta);
            RootInputDataInformationEvent riEvent = (RootInputDataInformationEvent) tezEvent
                .getEvent();
            TezTaskID targetTaskID = TezTaskID.getInstance(vertex.getVertexId(),
                riEvent.getTargetIndex());
            vertex.eventHandler.handle(new TaskEventAddTezEvent(targetTaskID, tezEvent));
          }
          break;
        case VERTEX_MANAGER_EVENT:
        {
          VertexManagerEvent vmEvent = (VertexManagerEvent) tezEvent.getEvent();
          Vertex target = vertex.getDAG().getVertex(vmEvent.getTargetVertexName());
          Preconditions.checkArgument(target != null,
              "Event sent to unkown vertex: " + vmEvent.getTargetVertexName());
          if (target == vertex) {
            vertex.vertexManager.onVertexManagerEventReceived(vmEvent);
          } else {
            checkEventSourceMetadata(vertex, sourceMeta);
            vertex.eventHandler.handle(new VertexEventRouteEvent(target
                .getVertexId(), Collections.singletonList(tezEvent)));
          }
        }
          break;
        case ROOT_INPUT_INITIALIZER_EVENT:
        {
          RootInputInitializerEvent riEvent = (RootInputInitializerEvent) tezEvent.getEvent();
          Vertex target = vertex.getDAG().getVertex(riEvent.getTargetVertexName());
          Preconditions.checkArgument(target != null,
              "Event sent to unkown vertex: " + riEvent.getTargetVertexName());
          if (target == vertex) {
            vertex.rootInputInitializerManager.handleInitializerEvent(riEvent);
          } else {
            checkEventSourceMetadata(vertex, sourceMeta);
            vertex.eventHandler.handle(new VertexEventRouteEvent(target.getVertexId(),
                Collections.singletonList(tezEvent)));
          }
        }
          break;
        case INPUT_READ_ERROR_EVENT:
          {
            checkEventSourceMetadata(vertex, sourceMeta);
            Edge srcEdge = vertex.sourceVertices.get(vertex.getDAG().getVertex(
                sourceMeta.getEdgeVertexName()));
            srcEdge.sendTezEventToSourceTasks(tezEvent);
          }
          break;
        case TASK_STATUS_UPDATE_EVENT:
          {
            checkEventSourceMetadata(vertex, sourceMeta);
            TaskStatusUpdateEvent sEvent =
                (TaskStatusUpdateEvent) tezEvent.getEvent();
            vertex.getEventHandler().handle(
                new TaskAttemptEventStatusUpdate(sourceMeta.getTaskAttemptID(),
                    sEvent));
          }
          break;
        case TASK_ATTEMPT_COMPLETED_EVENT:
          {
            checkEventSourceMetadata(vertex, sourceMeta);
            vertex.getEventHandler().handle(
                new TaskAttemptEvent(sourceMeta.getTaskAttemptID(),
                    TaskAttemptEventType.TA_DONE));
          }
          break;
        case TASK_ATTEMPT_FAILED_EVENT:
          {
            checkEventSourceMetadata(vertex, sourceMeta);
            TaskAttemptFailedEvent taskFailedEvent =
                (TaskAttemptFailedEvent) tezEvent.getEvent();
            vertex.getEventHandler().handle(
                new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
                    TaskAttemptEventType.TA_FAILED,
                    "Error: " + taskFailedEvent.getDiagnostics()));
          }
          break;
        default:
          throw new TezUncheckedException("Unhandled tez event type: "
              + tezEvent.getEventType());
        }
      }
    }