in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [980:1044]
protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
Map<String, JobConfig> jobConfigMap, WorkflowControllerDataProvider clusterDataCache) {
boolean incomplete = false;
TaskState workflowState = ctx.getWorkflowState();
if (TaskState.TIMED_OUT.equals(workflowState)) {
// We don't update job state here as JobRebalancer will do it
return true;
}
// Check if failed job count is beyond threshold and if so, fail the workflow
// and abort in-progress jobs
int failedJobs = 0;
for (String job : cfg.getJobDag().getAllNodes()) {
TaskState jobState = ctx.getJobState(job);
if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
failedJobs++;
if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
ctx.setWorkflowState(TaskState.FAILED);
LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.",
cfg.getWorkflowId());
for (String jobToFail : cfg.getJobDag().getAllNodes()) {
if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
ctx.setJobState(jobToFail, TaskState.ABORTED);
// Skip aborted jobs latency since they are not accurate latency for job running time
if (_clusterStatusMonitor != null) {
_clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail),
TaskState.ABORTED);
}
// Since the job is aborted, release resources occupied by it
// Otherwise, we run the risk of resource leak
if (clusterDataCache != null) {
AssignableInstanceManager assignableInstanceManager =
clusterDataCache.getAssignableInstanceManager();
JobConfig jobConfig = jobConfigMap.get(jobToFail);
String quotaType = jobConfig.getJobType();
Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
// Iterate over all tasks and release them
for (Map.Entry<String, TaskConfig> taskEntry : taskConfigMap.entrySet()) {
TaskConfig taskConfig = taskEntry.getValue();
for (String assignableInstanceName : assignableInstanceManager
.getAssignableInstanceNames()) {
assignableInstanceManager.release(assignableInstanceName, taskConfig,
quotaType);
}
}
}
}
}
return true;
}
}
if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
&& jobState != TaskState.TIMED_OUT) {
incomplete = true;
}
}
if (!incomplete && cfg.isTerminable()) {
ctx.setWorkflowState(TaskState.COMPLETED);
return true;
}
return false;
}