public VertexState transition()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [2265:2502]


    public VertexState transition(VertexImpl vertex, VertexEvent vertexEvent) {
      VertexEventSourceVertexRecovered sourceRecoveredEvent =
          (VertexEventSourceVertexRecovered) vertexEvent;
      // Use distance from root from Recovery events as upstream vertices may not
      // send source vertex started event that is used to compute distance
      int distanceFromRoot = sourceRecoveredEvent.getSourceDistanceFromRoot() + 1;
      if(vertex.distanceFromRoot < distanceFromRoot) {
        vertex.distanceFromRoot = distanceFromRoot;
      }

      ++vertex.numRecoveredSourceVertices;

      switch (sourceRecoveredEvent.getSourceVertexState()) {
        case NEW:
          // Nothing to do
          break;
        case INITED:
          ++vertex.numInitedSourceVertices;
          break;
        case RUNNING:
        case SUCCEEDED:
          ++vertex.numInitedSourceVertices;
          ++vertex.numStartedSourceVertices;
          if (sourceRecoveredEvent.getCompletedTaskAttempts() != null) {
            vertex.pendingReportedSrcCompletions.addAll(
                sourceRecoveredEvent.getCompletedTaskAttempts());
          }
          break;
        case FAILED:
        case KILLED:
        case ERROR:
          // Nothing to do
          // Recover as if source vertices have not inited/started
          break;
        default:
          LOG.warn("Received invalid SourceVertexRecovered event"
              + ", vertex=" + vertex.logIdentifier
              + ", sourceVertex=" + sourceRecoveredEvent.getSourceVertexID()
              + ", sourceVertexState=" + sourceRecoveredEvent.getSourceVertexState());
          return vertex.finished(VertexState.ERROR);
      }

      if (vertex.numRecoveredSourceVertices !=
          vertex.getInputVerticesCount()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Waiting for source vertices to recover"
              + ", vertex=" + vertex.logIdentifier
              + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
              + ", totalSourceVertices=" + vertex.getInputVerticesCount());
        }
        return VertexState.RECOVERING;
      }


      // Complete recovery
      VertexState endState = VertexState.NEW;
      List<TezTaskAttemptID> completedTaskAttempts = Lists.newLinkedList();
      switch (vertex.recoveredState) {
        case NEW:
          // Drop all root events if not inited properly
          Iterator<TezEvent> iterator = vertex.recoveredEvents.iterator();
          while (iterator.hasNext()) {
            if (iterator.next().getEventType().equals(
                EventType.ROOT_INPUT_DATA_INFORMATION_EVENT)) {
              iterator.remove();
            }
          }
          // Trigger init if all sources initialized
          if (vertex.numInitedSourceVertices == vertex.getInputVerticesCount()) {
            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
                VertexEventType.V_INIT));
          }
          if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
            vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
                VertexEventType.V_START));
          }
          endState = VertexState.NEW;
          break;
        case INITED:
          vertex.vertexAlreadyInitialized = true;
          try {
            vertex.initializeCommitters();
          } catch (Exception e) {
            LOG.info("Failed to initialize committers, vertex="
                + vertex.logIdentifier, e);
            vertex.finished(VertexState.FAILED,
                VertexTerminationCause.INIT_FAILURE);
            endState = VertexState.FAILED;
            break;
          }
          if (!vertex.setParallelism(0,
              null, vertex.recoveredSourceEdgeManagers, vertex.recoveredRootInputSpecUpdates, true)) {
            LOG.info("Failed to recover edge managers, vertex="
                + vertex.logIdentifier);
            vertex.finished(VertexState.FAILED,
                VertexTerminationCause.INIT_FAILURE);
            endState = VertexState.FAILED;
            break;
          }
          // Recover tasks
          if (vertex.tasks != null) {
            for (Task task : vertex.tasks.values()) {
              vertex.eventHandler.handle(
                  new TaskEventRecoverTask(task.getTaskId()));
            }
          }
          if (vertex.numInitedSourceVertices != vertex.getInputVerticesCount()) {
            LOG.info("Vertex already initialized but source vertices have not"
                + " initialized"
                + ", vertexId=" + vertex.logIdentifier
                + ", numInitedSourceVertices=" + vertex.numInitedSourceVertices);
          } else {
            if (vertex.numStartedSourceVertices == vertex.getInputVerticesCount()) {
              vertex.eventHandler.handle(new VertexEvent(vertex.vertexId,
                VertexEventType.V_START));
            }
          }
          endState = VertexState.INITED;
          break;
        case RUNNING:
          vertex.tasksNotYetScheduled = false;
          // if commit in progress and desired state is not a succeeded one,
          // move to failed
          if (vertex.recoveryCommitInProgress) {
            LOG.info("Recovered vertex was in the middle of a commit"
                + ", failing Vertex=" + vertex.logIdentifier);
            vertex.finished(VertexState.FAILED,
                VertexTerminationCause.COMMIT_FAILURE);
            endState = VertexState.FAILED;
            break;
          }
          try {
            vertex.initializeCommitters();
          } catch (Exception e) {
            LOG.info("Failed to initialize committers", e);
            vertex.finished(VertexState.FAILED,
                VertexTerminationCause.INIT_FAILURE);
            endState = VertexState.FAILED;
            break;
          }
          if (!vertex.setParallelism(0, null, vertex.recoveredSourceEdgeManagers,
            vertex.recoveredRootInputSpecUpdates, true)) {
            LOG.info("Failed to recover edge managers");
            vertex.finished(VertexState.FAILED,
                VertexTerminationCause.INIT_FAILURE);
            endState = VertexState.FAILED;
            break;
          }
          assert vertex.tasks.size() == vertex.numTasks;
          if (vertex.tasks != null && vertex.numTasks != 0) {
            for (Task task : vertex.tasks.values()) {
              vertex.eventHandler.handle(
                  new TaskEventRecoverTask(task.getTaskId()));
            }
            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
            endState = VertexState.RUNNING;
          } else {
            endState = VertexState.SUCCEEDED;
            vertex.finished(endState);
          }
          break;
        case SUCCEEDED:
        case FAILED:
        case KILLED:
          vertex.tasksNotYetScheduled = false;
          // recover tasks
          assert vertex.tasks.size() == vertex.numTasks;
          if (vertex.tasks != null  && vertex.numTasks != 0) {
            TaskState taskState = TaskState.KILLED;
            switch (vertex.recoveredState) {
              case SUCCEEDED:
                taskState = TaskState.SUCCEEDED;
                break;
              case KILLED:
                taskState = TaskState.KILLED;
                break;
              case FAILED:
                taskState = TaskState.FAILED;
                break;
            }
            for (Task task : vertex.tasks.values()) {
              vertex.eventHandler.handle(
                  new TaskEventRecoverTask(task.getTaskId(),
                      taskState));
            }
            // Wait for all tasks to recover and report back
            vertex.vertexManager.onVertexStarted(vertex.pendingReportedSrcCompletions);
            endState = VertexState.RUNNING;
          } else {
            endState = vertex.recoveredState;
            vertex.finished(endState);
          }
          break;
        default:
          LOG.warn("Invalid recoveredState found when trying to recover"
              + " vertex, recoveredState=" + vertex.recoveredState);
          vertex.finished(VertexState.ERROR);
          endState = VertexState.ERROR;
          break;
      }

      LOG.info("Recovered Vertex State"
          + ", vertexId=" + vertex.logIdentifier
          + ", state=" + endState
          + ", numInitedSourceVertices" + vertex.numInitedSourceVertices
          + ", numStartedSourceVertices=" + vertex.numStartedSourceVertices
          + ", numRecoveredSourceVertices=" + vertex.numRecoveredSourceVertices
          + ", tasksIsNull=" + (vertex.tasks == null)
          + ", numTasks=" + ( vertex.tasks == null ? 0 : vertex.tasks.size()));

      for (Entry<Vertex, Edge> entry : vertex.getOutputVertices().entrySet()) {
        vertex.eventHandler.handle(new VertexEventSourceVertexRecovered(
            entry.getKey().getVertexId(),
            vertex.vertexId, endState, completedTaskAttempts,
            vertex.getDistanceFromRoot()));
      }
      if (EnumSet.of(VertexState.RUNNING, VertexState.SUCCEEDED, VertexState.INITED)
          .contains(endState)) {
        // Send events downstream
        vertex.routeRecoveredEvents(endState, vertex.recoveredEvents);
        vertex.recoveredEvents.clear();
        if (!vertex.pendingRouteEvents.isEmpty()) {
          VertexImpl.ROUTE_EVENT_TRANSITION.transition(vertex,
              new VertexEventRouteEvent(vertex.getVertexId(),
                  vertex.pendingRouteEvents));
          vertex.pendingRouteEvents.clear();
        }
      } else {
        // Ensure no recovered events
        if (!vertex.recoveredEvents.isEmpty()) {
          throw new RuntimeException("Invalid Vertex state"
              + ", found non-zero recovered events in invalid state"
              + ", recoveredState=" + endState
              + ", recoveredEvents=" + vertex.recoveredEvents.size());
        }
      }
      return endState;
    }