static VertexState checkVertexForCompletion()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [1436:1573]


  static VertexState checkVertexForCompletion(final VertexImpl vertex) {

    if (LOG.isDebugEnabled()) {
      LOG.debug("Checking for vertex completion for "
          + vertex.logIdentifier
          + ", numTasks=" + vertex.numTasks
          + ", failedTaskCount=" + vertex.failedTaskCount
          + ", killedTaskCount=" + vertex.killedTaskCount
          + ", successfulTaskCount=" + vertex.succeededTaskCount
          + ", completedTaskCount=" + vertex.completedTaskCount
          + ", terminationCause=" + vertex.terminationCause);
    }

    //check for vertex failure first
    if (vertex.completedTaskCount > vertex.tasks.size()) {
      LOG.error("task completion accounting issue: completedTaskCount > nTasks:"
          + " for vertex " + vertex.logIdentifier
          + ", numTasks=" + vertex.numTasks
          + ", failedTaskCount=" + vertex.failedTaskCount
          + ", killedTaskCount=" + vertex.killedTaskCount
          + ", successfulTaskCount=" + vertex.succeededTaskCount
          + ", completedTaskCount=" + vertex.completedTaskCount
          + ", terminationCause=" + vertex.terminationCause);
    }

    if (vertex.completedTaskCount == vertex.tasks.size()) {
      //Only succeed if tasks complete successfully and no terminationCause is registered.
      if(vertex.succeededTaskCount == vertex.tasks.size() && vertex.terminationCause == null) {
        LOG.info("Vertex succeeded: " + vertex.logIdentifier);
        try {
          if (vertex.commitVertexOutputs && !vertex.committed.getAndSet(true)) {
            // commit only once. Dont commit shared outputs
            LOG.info("Invoking committer commit for vertex, vertexId="
                + vertex.logIdentifier);
            if (vertex.outputCommitters != null
                && !vertex.outputCommitters.isEmpty()) {
              boolean firstCommit = true;
              for (Entry<String, OutputCommitter> entry : vertex.outputCommitters.entrySet()) {
                final OutputCommitter committer = entry.getValue();
                final String outputName = entry.getKey();
                if (vertex.sharedOutputs.contains(outputName)) {
                  // dont commit shared committers. Will be committed by the DAG
                  continue;
                }
                if (firstCommit) {
                  // Log commit start event on first actual commit
                  try {
                    vertex.appContext.getHistoryHandler().handleCriticalEvent(
                        new DAGHistoryEvent(vertex.getDAGId(),
                            new VertexCommitStartedEvent(vertex.vertexId,
                                vertex.clock.getTime())));
                  } catch (IOException e) {
                    LOG.error("Failed to persist commit start event to recovery, vertexId="
                        + vertex.logIdentifier, e);
                    vertex.trySetTerminationCause(VertexTerminationCause.INTERNAL_ERROR);
                    return vertex.finished(VertexState.FAILED);
                  }
                } else {
                  firstCommit = false;
                }
                vertex.dagUgi.doAs(new PrivilegedExceptionAction<Void>() {
                  @Override
                  public Void run() throws Exception {
                      LOG.info("Invoking committer commit for output=" + outputName
                          + ", vertexId=" + vertex.logIdentifier);
                      committer.commitOutput();
                    return null;
                  }
                });
              }
            }
          }
        } catch (Exception e) {
          LOG.error("Failed to do commit on vertex, vertexId="
              + vertex.logIdentifier, e);
          vertex.trySetTerminationCause(VertexTerminationCause.COMMIT_FAILURE);
          return vertex.finished(VertexState.FAILED);
        }
        return vertex.finished(VertexState.SUCCEEDED);
      }
      else if(vertex.terminationCause == VertexTerminationCause.DAG_KILL ){
        vertex.setFinishTime();
        String diagnosticMsg = "Vertex killed due to user-initiated job kill. "
            + "failedTasks:"
            + vertex.failedTaskCount;
        LOG.info(diagnosticMsg);
        vertex.addDiagnostic(diagnosticMsg);
        vertex.abortVertex(VertexStatus.State.KILLED);
        return vertex.finished(VertexState.KILLED);
      }
      else if(vertex.terminationCause == VertexTerminationCause.OTHER_VERTEX_FAILURE ){
        vertex.setFinishTime();
        String diagnosticMsg = "Vertex killed as other vertex failed. "
            + "failedTasks:"
            + vertex.failedTaskCount;
        LOG.info(diagnosticMsg);
        vertex.addDiagnostic(diagnosticMsg);
        vertex.abortVertex(VertexStatus.State.KILLED);
        return vertex.finished(VertexState.KILLED);
      }
      else if(vertex.terminationCause == VertexTerminationCause.OWN_TASK_FAILURE ){
        if(vertex.failedTaskCount == 0){
          LOG.error("task failure accounting error.  terminationCause=TASK_FAILURE but vertex.failedTaskCount == 0");
        }
        vertex.setFinishTime();
        String diagnosticMsg = "Vertex failed as one or more tasks failed. "
            + "failedTasks:"
            + vertex.failedTaskCount;
        LOG.info(diagnosticMsg);
        vertex.addDiagnostic(diagnosticMsg);
        vertex.abortVertex(VertexStatus.State.FAILED);
        return vertex.finished(VertexState.FAILED);
      }
      else if (vertex.terminationCause == VertexTerminationCause.INTERNAL_ERROR) {
        vertex.setFinishTime();
        String diagnosticMsg = "Vertex failed/killed due to internal error. "
            + "failedTasks:"
            + vertex.failedTaskCount
            + " killedTasks:"
            + vertex.killedTaskCount;
        LOG.info(diagnosticMsg);
        vertex.abortVertex(State.FAILED);
        return vertex.finished(VertexState.FAILED);
      }
      else {
        //should never occur
        throw new TezUncheckedException("All tasks complete, but cannot determine final state of vertex"
            + ", failedTaskCount=" + vertex.failedTaskCount
            + ", killedTaskCount=" + vertex.killedTaskCount
            + ", successfulTaskCount=" + vertex.succeededTaskCount
            + ", completedTaskCount=" + vertex.completedTaskCount
            + ", terminationCause=" + vertex.terminationCause);
      }
    }

    //return the current state, Vertex not finished yet
    return vertex.getInternalState();
  }