in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [726:847]
private synchronized boolean commitOrAbortOutputs(boolean dagSucceeded) {
if (this.committedOrAborted) {
LOG.info("Ignoring multiple output commit/abort");
return this.allOutputsCommitted;
}
LOG.info("Calling DAG commit/abort for dag: " + getID());
this.committedOrAborted = true;
boolean successfulOutputsAlreadyCommitted = !commitAllOutputsOnSuccess;
boolean failedWhileCommitting = false;
if (dagSucceeded && !successfulOutputsAlreadyCommitted) {
// commit all shared outputs
try {
appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
new DAGCommitStartedEvent(getID(), clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit event to history/recovery handler", e);
trySetTerminationCause(DAGTerminationCause.RECOVERY_FAILURE);
return false;
}
for (VertexGroupInfo groupInfo : vertexGroups.values()) {
if (failedWhileCommitting) {
break;
}
if (!groupInfo.outputs.isEmpty()) {
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName + " for group: " + groupInfo.groupName);
if (!commitOutput(outputName, committer)) {
failedWhileCommitting = true;
break;
}
}
}
}
// commit all other outputs
// we come here for successful dag completion and when outputs need to be
// committed at the end for all or none visibility
for (Vertex vertex : vertices.values()) {
if (failedWhileCommitting) {
break;
}
if (vertex.getOutputCommitters() == null) {
LOG.info("No output committers for vertex: " + vertex.getName());
continue;
}
Map<String, OutputCommitter> outputCommitters =
new HashMap<String, OutputCommitter>(vertex.getOutputCommitters());
Set<String> sharedOutputs = vertex.getSharedOutputs();
// remove shared outputs
if (sharedOutputs != null) {
Iterator<Map.Entry<String, OutputCommitter>> iter = outputCommitters
.entrySet().iterator();
while (iter.hasNext()) {
if (sharedOutputs.contains(iter.next().getKey())) {
iter.remove();
}
}
}
if (outputCommitters.isEmpty()) {
LOG.info("No exclusive output committers for vertex: " + vertex.getName());
continue;
}
for (Map.Entry<String, OutputCommitter> entry : outputCommitters.entrySet()) {
LOG.info("Committing output: " + entry.getKey() + " for vertex: "
+ vertex.getVertexId());
if (vertex.getState() != VertexState.SUCCEEDED) {
throw new TezUncheckedException("Vertex: " + vertex.getName() +
" not in SUCCEEDED state. State= " + vertex.getState());
}
if (!commitOutput(entry.getKey(), entry.getValue())) {
failedWhileCommitting = true;
break;
}
}
}
}
if (failedWhileCommitting) {
LOG.info("DAG: " + getID() + " failed while committing");
}
if (!dagSucceeded || failedWhileCommitting) {
// come here because dag failed or
// dag succeeded and all or none semantics were on and a commit failed
for (Vertex vertex : vertices.values()) {
Map<String, OutputCommitter> outputCommitters = vertex
.getOutputCommitters();
if (outputCommitters == null || outputCommitters.isEmpty()) {
LOG.info("No output committers for vertex: " + vertex.getName());
continue;
}
for (Map.Entry<String, OutputCommitter> entry : outputCommitters
.entrySet()) {
final OutputCommitter committer = entry.getValue();
if (commitAllOutputsOnSuccess // commit all outputs on success
|| vertex.getState() != VertexState.SUCCEEDED // never commit unsuccessful outputs
) {
LOG.info("Aborting output: " + entry.getKey() + " for vertex: "
+ vertex.getVertexId());
try {
getDagUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
committer.abortOutput(VertexStatus.State.FAILED);
return null;
}
});
} catch (Exception e) {
LOG.info("Exception in aborting output: " + entry.getKey()
+ " for vertex: " + vertex.getVertexId(), e);
}
}
// else successful outputs have already been committed
}
}
}
allOutputsCommitted = !failedWhileCommitting;
return allOutputsCommitted;
}