protected boolean isWorkflowFinished()

in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [980:1044]


  protected boolean isWorkflowFinished(WorkflowContext ctx, WorkflowConfig cfg,
      Map<String, JobConfig> jobConfigMap, WorkflowControllerDataProvider clusterDataCache) {
    boolean incomplete = false;

    TaskState workflowState = ctx.getWorkflowState();
    if (TaskState.TIMED_OUT.equals(workflowState)) {
      // We don't update job state here as JobRebalancer will do it
      return true;
    }

    // Check if failed job count is beyond threshold and if so, fail the workflow
    // and abort in-progress jobs
    int failedJobs = 0;
    for (String job : cfg.getJobDag().getAllNodes()) {
      TaskState jobState = ctx.getJobState(job);
      if (jobState == TaskState.FAILED || jobState == TaskState.TIMED_OUT) {
        failedJobs++;
        if (!cfg.isJobQueue() && failedJobs > cfg.getFailureThreshold()) {
          ctx.setWorkflowState(TaskState.FAILED);
          LOG.info("Workflow {} reached the failure threshold, so setting its state to FAILED.",
              cfg.getWorkflowId());
          for (String jobToFail : cfg.getJobDag().getAllNodes()) {
            if (ctx.getJobState(jobToFail) == TaskState.IN_PROGRESS) {
              ctx.setJobState(jobToFail, TaskState.ABORTED);

              // Skip aborted jobs latency since they are not accurate latency for job running time
              if (_clusterStatusMonitor != null) {
                _clusterStatusMonitor.updateJobCounters(jobConfigMap.get(jobToFail),
                    TaskState.ABORTED);
              }

              // Since the job is aborted, release resources occupied by it
              // Otherwise, we run the risk of resource leak
              if (clusterDataCache != null) {
                AssignableInstanceManager assignableInstanceManager =
                    clusterDataCache.getAssignableInstanceManager();
                JobConfig jobConfig = jobConfigMap.get(jobToFail);
                String quotaType = jobConfig.getJobType();
                Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
                // Iterate over all tasks and release them
                for (Map.Entry<String, TaskConfig> taskEntry : taskConfigMap.entrySet()) {
                  TaskConfig taskConfig = taskEntry.getValue();
                  for (String assignableInstanceName : assignableInstanceManager
                      .getAssignableInstanceNames()) {
                    assignableInstanceManager.release(assignableInstanceName, taskConfig,
                        quotaType);
                  }
                }
              }
            }
          }
          return true;
        }
      }
      if (jobState != TaskState.COMPLETED && jobState != TaskState.FAILED
          && jobState != TaskState.TIMED_OUT) {
        incomplete = true;
      }
    }
    if (!incomplete && cfg.isTerminable()) {
      ctx.setWorkflowState(TaskState.COMPLETED);
      return true;
    }
    return false;
  }