in gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java [298:350]
public static boolean isDagFinished(Dag<JobExecutionPlan> dag) {
/*
The algo for this method is that it adds all the dag nodes into a set `canRun` that signifies all the nodes that can
run in this dag. This also includes all the jobs that are completed. It scans all the nodes and if the node is
completed it adds it to the `completed` set; if the node is failed/cancelled it removes all its dependant nodes from
`canRun` set. In the end if there are more nodes that "canRun" than "completed", dag is not finished.
For FINISH_RUNNING failure option, there is an additional condition that all the remaining `canRun` jobs should already
be running/orchestrated/pending_retry/pending_resume. Basically they should already be out of PENDING state, in order
for dag to be considered "NOT FINISHED".
*/
List<Dag.DagNode<JobExecutionPlan>> nodes = dag.getNodes();
Set<Dag.DagNode<JobExecutionPlan>> canRun = new HashSet<>(nodes);
Set<Dag.DagNode<JobExecutionPlan>> completed = new HashSet<>();
boolean anyFailure = false;
for (Dag.DagNode<JobExecutionPlan> node : nodes) {
if (!canRun.contains(node)) {
continue;
}
ExecutionStatus status = node.getValue().getExecutionStatus();
if (status == ExecutionStatus.FAILED || status == ExecutionStatus.CANCELLED) {
anyFailure = true;
removeDescendantsFromCanRun(node, dag, canRun);
completed.add(node);
} else if (status == ExecutionStatus.COMPLETE) {
completed.add(node);
} else if (status == ExecutionStatus.PENDING) {
// Remove PENDING node if its parents are not in canRun, this means remove the pending nodes also from canRun set
// if its parents cannot run
if (!areAllParentsInCanRun(node, canRun)) {
canRun.remove(node);
}
} else if (!(status == COMPILED || status == PENDING_RESUME || status == PENDING_RETRY || status == ORCHESTRATED ||
status == RUNNING)) {
throw new RuntimeException("Unexpected status " + status + " for dag node " + node);
}
}
assert canRun.size() >= completed.size();
DagProcessingEngine.FailureOption failureOption = DagUtils.getFailureOption(dag);
if (!anyFailure || failureOption == DagProcessingEngine.FailureOption.FINISH_ALL_POSSIBLE) {
// In the end, check if there are more nodes in canRun than completed
return canRun.size() == completed.size();
} else if (failureOption == DagProcessingEngine.FailureOption.FINISH_RUNNING) {
// if all the remaining jobs are pending/compiled (basically not started yet) return true
canRun.removeAll(completed);
return canRun.stream().allMatch(node -> (node.getValue().getExecutionStatus() == PENDING || node.getValue().getExecutionStatus() == COMPILED));
} else {
throw new RuntimeException("Unexpected failure option " + failureOption);
}
}