private boolean vertexSucceeded()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1649:1726]


  private boolean vertexSucceeded(Vertex vertex) {
    numSuccessfulVertices++;
    boolean failedCommit = false;
    boolean recoveryFailed = false;
    if (!commitAllOutputsOnSuccess) {
      // committing successful outputs immediately. check for shared outputs
      List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName());
      if (groupsList != null) {
        List<VertexGroupInfo> commitList = Lists.newArrayListWithCapacity(groupsList
            .size());
        for (VertexGroupInfo groupInfo : groupsList) {
          groupInfo.successfulMembers++;
          if (groupInfo.groupMembers.size() == groupInfo.successfulMembers
              && !groupInfo.outputs.isEmpty()) {
            // group has outputs and all vertex members are done
            LOG.info("All members of group: " + groupInfo.groupName
                + " are succeeded. Commiting outputs");
            commitList.add(groupInfo);
          }
        }
        for (VertexGroupInfo groupInfo : commitList) {
          if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
            LOG.info("VertexGroup was already committed as per recovery"
                + " data, groupName=" + groupInfo.groupName);
            continue;
          }
          groupInfo.committed = true;
          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
          try {
            appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
                new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
                    clock.getTime())));
          } catch (IOException e) {
            LOG.error("Failed to send commit recovery event to handler", e);
            recoveryFailed = true;
            failedCommit = true;
          }
          if (!failedCommit) {
            for (String outputName : groupInfo.outputs) {
              OutputCommitter committer = v.getOutputCommitters().get(outputName);
              LOG.info("Committing output: " + outputName);
              if (!commitOutput(outputName, committer)) {
                // using same logic as vertex level commit. stop after first failure.
                failedCommit = true;
                break;
              }
            }
          }
          if (failedCommit) {
            break;
          }
          try {
            appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
                new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
                    clock.getTime())));
          } catch (IOException e) {
            LOG.error("Failed to send commit recovery event to handler", e);
            recoveryFailed = true;
            failedCommit = true;
          }
        }
      }
    }

    if (failedCommit) {
      LOG.info("Aborting job due to failure in commit.");
      if (!recoveryFailed) {
        enactKill(DAGTerminationCause.COMMIT_FAILURE,
            VertexTerminationCause.COMMIT_FAILURE);
      } else {
        LOG.info("Recovery failure occurred during commit");
        enactKill(DAGTerminationCause.RECOVERY_FAILURE,
            VertexTerminationCause.COMMIT_FAILURE);
      }
    }

    return !failedCommit;
  }