in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java [1649:1726]
private boolean vertexSucceeded(Vertex vertex) {
numSuccessfulVertices++;
boolean failedCommit = false;
boolean recoveryFailed = false;
if (!commitAllOutputsOnSuccess) {
// committing successful outputs immediately. check for shared outputs
List<VertexGroupInfo> groupsList = vertexGroupInfo.get(vertex.getName());
if (groupsList != null) {
List<VertexGroupInfo> commitList = Lists.newArrayListWithCapacity(groupsList
.size());
for (VertexGroupInfo groupInfo : groupsList) {
groupInfo.successfulMembers++;
if (groupInfo.groupMembers.size() == groupInfo.successfulMembers
&& !groupInfo.outputs.isEmpty()) {
// group has outputs and all vertex members are done
LOG.info("All members of group: " + groupInfo.groupName
+ " are succeeded. Commiting outputs");
commitList.add(groupInfo);
}
}
for (VertexGroupInfo groupInfo : commitList) {
if (recoveredGroupCommits.containsKey(groupInfo.groupName)) {
LOG.info("VertexGroup was already committed as per recovery"
+ " data, groupName=" + groupInfo.groupName);
continue;
}
groupInfo.committed = true;
Vertex v = getVertex(groupInfo.groupMembers.iterator().next());
try {
appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
new VertexGroupCommitStartedEvent(dagId, groupInfo.groupName,
clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit recovery event to handler", e);
recoveryFailed = true;
failedCommit = true;
}
if (!failedCommit) {
for (String outputName : groupInfo.outputs) {
OutputCommitter committer = v.getOutputCommitters().get(outputName);
LOG.info("Committing output: " + outputName);
if (!commitOutput(outputName, committer)) {
// using same logic as vertex level commit. stop after first failure.
failedCommit = true;
break;
}
}
}
if (failedCommit) {
break;
}
try {
appContext.getHistoryHandler().handleCriticalEvent(new DAGHistoryEvent(getID(),
new VertexGroupCommitFinishedEvent(dagId, groupInfo.groupName,
clock.getTime())));
} catch (IOException e) {
LOG.error("Failed to send commit recovery event to handler", e);
recoveryFailed = true;
failedCommit = true;
}
}
}
}
if (failedCommit) {
LOG.info("Aborting job due to failure in commit.");
if (!recoveryFailed) {
enactKill(DAGTerminationCause.COMMIT_FAILURE,
VertexTerminationCause.COMMIT_FAILURE);
} else {
LOG.info("Recovery failure occurred during commit");
enactKill(DAGTerminationCause.RECOVERY_FAILURE,
VertexTerminationCause.COMMIT_FAILURE);
}
}
return !failedCommit;
}