public DAGState transition()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1297:1423]


    public DAGState transition(DAGImpl dag, DAGEvent dagEvent) {
      DAGEventRecoverEvent recoverEvent = (DAGEventRecoverEvent) dagEvent;
      if (recoverEvent.hasDesiredState()) {
        // DAG completed or final end state known
        dag.recoveredState = recoverEvent.getDesiredState();
      }
      if (recoverEvent.getAdditionalUrlsForClasspath() != null) {
        LOG.info("Added additional resources : [" + recoverEvent.getAdditionalUrlsForClasspath()
            + "] to classpath");
        RelocalizationUtils.addUrlsToClassPath(recoverEvent.getAdditionalUrlsForClasspath());
      }

      switch (dag.recoveredState) {
        case NEW:
          // send DAG an Init and start events
          dag.eventHandler.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
          dag.eventHandler.handle(new DAGEventStartDag(dag.getID(), null));
          return DAGState.NEW;
        case INITED:
          // DAG inited but not started
          // This implies vertices need to be sent init event
          // Root vertices need to be sent start event
          // The vertices may already have been sent these events but the
          // DAG start may not have been persisted
          for (Vertex v : dag.vertices.values()) {
            if (v.getInputVerticesCount() == 0) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Sending Running Recovery event to root vertex "
                    + v.getName());
              }
              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
                  VertexState.RUNNING));
            }
          }
          return DAGState.RUNNING;
        case RUNNING:
          // if commit is in progress, DAG should fail as commits are not
          // recoverable
          boolean groupCommitInProgress = false;
          if (!dag.recoveredGroupCommits.isEmpty()) {
            for (Entry<String, Boolean> entry : dag.recoveredGroupCommits.entrySet()) {
              if (!entry.getValue().booleanValue()) {
                LOG.info("Found a pending Vertex Group commit"
                    + ", vertexGroup=" + entry.getKey());
              }
              groupCommitInProgress = true;
            }
          }

          if (groupCommitInProgress || dag.recoveryCommitInProgress) {
            // Fail the DAG as we have not seen a commit completion
            dag.trySetTerminationCause(DAGTerminationCause.COMMIT_FAILURE);
            dag.setFinishTime();
            // Recover all other data for all vertices
            // send recover event to all vertices with a final end state
            for (Vertex v : dag.vertices.values()) {
              VertexState desiredState = VertexState.SUCCEEDED;
              if (dag.recoveredState.equals(DAGState.KILLED)) {
                desiredState = VertexState.KILLED;
              } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
                  dag.recoveredState)) {
                desiredState = VertexState.FAILED;
              }
              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
                  desiredState));
            }
            DAGState endState = DAGState.FAILED;
            try {
              dag.logJobHistoryUnsuccesfulEvent(endState);
            } catch (IOException e) {
              LOG.warn("Failed to persist recovery event for DAG completion"
                  + ", dagId=" + dag.dagId
                  + ", finalState=" + endState);
            }
            dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
                endState));
            return endState;
          }

          for (Vertex v : dag.vertices.values()) {
            if (v.getInputVerticesCount() == 0) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Sending Running Recovery event to root vertex "
                    + v.getName());
              }
              dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
                  VertexState.RUNNING));
            }
          }
          return DAGState.RUNNING;
        case SUCCEEDED:
        case ERROR:
        case FAILED:
        case KILLED:
          // Completed

          // Recover all other data for all vertices
          // send recover event to all vertices with a final end state
          for (Vertex v : dag.vertices.values()) {
            VertexState desiredState = VertexState.SUCCEEDED;
            if (dag.recoveredState.equals(DAGState.KILLED)) {
              desiredState = VertexState.KILLED;
            } else if (EnumSet.of(DAGState.ERROR, DAGState.FAILED).contains(
                dag.recoveredState)) {
              desiredState = VertexState.FAILED;
            }
            dag.eventHandler.handle(new VertexEventRecoverVertex(v.getVertexId(),
                desiredState));
          }

          // Let us inform AM of completion
          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
              dag.recoveredState));

          LOG.info("Recovered DAG: " + dag.getID() + " finished with state: "
              + dag.recoveredState);
          return dag.recoveredState;
        default:
          // Error state
          LOG.warn("Trying to recover DAG, failed to recover"
              + " from non-handled state" + dag.recoveredState);
          // Tell AM ERROR so that it can shutdown
          dag.eventHandler.handle(new DAGAppMasterEventDAGFinished(dag.getID(),
              DAGState.ERROR));
          return DAGState.FAILED;
      }
    }