private synchronized DAGState commitOrFinish()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1090:1193]


  private synchronized DAGState commitOrFinish() {

    // commit all other outputs
    // we come here for successful dag completion and when outputs need to be
    // committed at the end for all or none visibility
    Map<OutputKey, CallableEvent> commitEvents = new HashMap<OutputKey, CallableEvent>();
    // commit all shared outputs
    for (final VertexGroupInfo groupInfo : vertexGroups.values()) {
      if (!groupInfo.outputs.isEmpty()) {
        groupInfo.commitStarted = true;
        final Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
        try {
          TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), v.getVertexId());
          for (final String outputName : groupInfo.outputs) {
            final OutputKey outputKey = new OutputKey(outputName, groupInfo.groupName, true);
            CommitCallback groupCommitCallback = new CommitCallback(outputKey);
            CallableEvent groupCommitCallableEvent = new CallableEvent(groupCommitCallback) {
              @Override
              public Void call() throws Exception {
                OutputCommitter committer = v.getOutputCommitters().get(outputName);
                LOG.info("Committing output: " + outputKey);
                commitOutput(committer);
                return null;
              }
            };
            commitEvents.put(outputKey, groupCommitCallableEvent);
          }
        } finally {
          appContext.getHadoopShim().clearHadoopCallerContext();
        }
      }
    }

    for (final Vertex vertex : vertices.values()) {
      if (vertex.getOutputCommitters() == null) {
        LOG.info("No output committers for vertex: " + vertex.getLogIdentifier());
        continue;
      }
      Map<String, OutputCommitter> outputCommitters =
          new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
      Set<String> sharedOutputs = vertex.getSharedOutputs();
      // remove shared outputs
      if (sharedOutputs != null) {
        Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
            .entrySet().iterator();
        while (iter.hasNext()) {
          if (sharedOutputs.contains(iter.next().getKey())) {
            iter.remove();
          }
        }
      }
      if (outputCommitters.isEmpty()) {
        LOG.info("No exclusive output committers for vertex: " + vertex.getLogIdentifier());
        continue;
      }
      try {
        TezUtilsInternal.setHadoopCallerContext(appContext.getHadoopShim(), vertex.getVertexId());
        for (final Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
          if (vertex.getState() != VertexState.SUCCEEDED) {
            throw new TezUncheckedException("Vertex: " + vertex.getLogIdentifier() +
                " not in SUCCEEDED state. State= " + vertex.getState());
          }
          OutputKey outputKey = new OutputKey(entry.getKey(), vertex.getName(), false);
          CommitCallback commitCallback = new CommitCallback(outputKey);
          CallableEvent commitCallableEvent = new CallableEvent(commitCallback) {
            @Override
            public Void call() throws Exception {
              LOG.info("Committing output: " + entry.getKey() + " for vertex: "
                  + vertex.getLogIdentifier() + ", outputName: " + entry.getKey());
              commitOutput(entry.getValue());
              return null;
            }
          };
          commitEvents.put(outputKey, commitCallableEvent);
        }
      } finally {
        appContext.getHadoopShim().clearHadoopCallerContext();
      }
    }
    
    if (!commitEvents.isEmpty()) {
      try {
        LOG.info("Start writing dag commit event, " + getID());
        appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
            new DAGCommitStartedEvent(getID(), clock.getTime())));
      } catch (IOException e) {
        LOG.error("Failed to send commit event to history/recovery handler", e);
        trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
        return finished(DAGState.FAILED);
      }
      for (Map.Entry<OutputKey,CallableEvent> entry : commitEvents.entrySet()) {
        ListenableFuture<Void> commitFuture = appContext.getExecService().submit(entry.getValue());
        Futures.addCallback(commitFuture, entry.getValue().getCallback(), GuavaShim.directExecutor());
        commitFutures.put(entry.getKey(), commitFuture);
      }
    }

    if (commitFutures.isEmpty()) {
      // no commit needs to be done
      return finished(DAGState.SUCCEEDED);
    } else {
      return DAGState.COMMITTING;
    }
  }