private synchronized boolean commitOrAbortOutputs()

in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [726:847]


  private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
    if (this.committedOrAborted) {
      LOG.info("Ignoring multiple output commit/abort");
      return this.allOutputsCommitted;
    }
    LOG.info("Calling DAG commit/abort for dag: " + getID());
    this.committedOrAborted = true;
    
    boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess;
    boolean failedWhileCommitting = false;
    if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
      // commit all shared outputs
      try {
        appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
            new DAGCommitStartedEvent(getID(), clock.getTime())));
      } catch (IOException e) {
        LOG.error("Failed to send commit event to history/recovery handler", e);
        trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
        return false;
      }
      for (VertexGroupInfo groupInfo : vertexGroups.values()) {
        if (failedWhileCommitting) {
          break;
        }
        if (!groupInfo.outputs.isEmpty()) {
          groupInfo.committed = true;
          Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
          for (String outputName : groupInfo.outputs) {
            OutputCommitter committer = v.getOutputCommitters().get(outputName);
            LOG.info("Committing output: " + outputName + " for group: " + groupInfo.groupName);
            if (!commitOutput(outputName, committer)) {
              failedWhileCommitting = true;
              break;
            }
          }
        }
      }
      // commit all other outputs
      // we come here for successful dag completion and when outputs need to be
      // committed at the end for all or none visibility
      for (Vertex vertex : vertices.values()) {
        if (failedWhileCommitting) {
          break;
        }
        if (vertex.getOutputCommitters() == null) {
          LOG.info("No output committers for vertex: " + vertex.getName());
          continue;
        }
        Map<String, OutputCommitter> outputCommitters = 
            new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
        Set<String> sharedOutputs = vertex.getSharedOutputs();
        // remove shared outputs
        if (sharedOutputs != null) {
          Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
              .entrySet().iterator();
          while (iter.hasNext()) {
            if (sharedOutputs.contains(iter.next().getKey())) {
              iter.remove();
            }
          }
        }
        if (outputCommitters.isEmpty()) {
          LOG.info("No exclusive output committers for vertex: " + vertex.getName());
          continue;
        }
        for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
          LOG.info("Committing output: " + entry.getKey() + " for vertex: "
              + vertex.getVertexId());
          if (vertex.getState() != VertexState.SUCCEEDED) {
            throw new TezUncheckedException("Vertex: " + vertex.getName() + 
                " not in SUCCEEDED state. State= " + vertex.getState());
          }
          if (!commitOutput(entry.getKey(), entry.getValue())) {
            failedWhileCommitting = true;
            break;
          }
        }
      }
    }
    
    if (failedWhileCommitting) {
      LOG.info("DAG: " + getID() + " failed while committing");
    }
        
    if (!dagSucceeded || failedWhileCommitting) {
      // come here because dag failed or
      // dag succeeded and all or none semantics were on and a commit failed
      for (Vertex vertex : vertices.values()) {
        Map<String, OutputCommitter> outputCommitters = vertex
            .getOutputCommitters();
        if (outputCommitters == null || outputCommitters.isEmpty()) {
          LOG.info("No output committers for vertex: " + vertex.getName());
          continue;
        }
        for (Map.Entry<String, OutputCommitter> entry : outputCommitters
            .entrySet()) {
          final OutputCommitter committer = entry.getValue();
          if (commitAllOutputsOnSuccess // commit all outputs on success
              || vertex.getState() != VertexState.SUCCEEDED // never commit unsuccessful outputs
              ) {
            LOG.info("Aborting output: " + entry.getKey() + " for vertex: "
                + vertex.getVertexId());
            try {
              getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
                @Override
                public Void run() throws Exception {
                  committer.abortOutput(VertexStatus.State.FAILED);
                  return null;
                }
              });
            } catch (Exception e) {
              LOG.info("Exception in aborting output: " + entry.getKey()
                  + " for vertex: " + vertex.getVertexId(), e);
            }
          }
          // else successful outputs have already been committed
        }
      }
    }
    allOutputsCommitted = !failedWhileCommitting;
    return allOutputsCommitted;
  }