public void updateWorkflowStatus()

in helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java [63:171]


  public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
      WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
      BestPossibleStateOutput bestPossibleOutput) {

    // Fetch workflow configuration and context
    if (workflowCfg == null) {
      LOG.warn("Workflow configuration is NULL for {}", workflow);
      return;
    }

    // Step 1: Check for deletion - if so, we don't need to go through further steps
    // Clean up if workflow marked for deletion
    TargetState targetState = workflowCfg.getTargetState();
    if (targetState == TargetState.DELETE) {
      LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
      updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
      cleanupWorkflow(workflow);
      return;
    }

    // Step 2: handle timeout, which should have higher priority than STOP
    // Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
    // the workflow already got timeouted. Job Queue will ignore the setup.
    if (!workflowCfg.isJobQueue()
        && !TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())) {
      // If timeout point has already been passed, it will not be scheduled
      scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());

      if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
          && isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
        workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
        _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
      }

      // We should not return after setting timeout, as in case the workflow is stopped already
      // marking it timeout will not trigger rebalance pipeline as we are not listening on
      // PropertyStore change, nor will we schedule rebalance for timeout as at this point,
      // workflow is already timed-out. We should let the code proceed and wait for schedule
      // future cleanup work
    }

    long currentTime = System.currentTimeMillis();

    // Step 3: Check and process finished workflow context (confusing,
    // but its inside isWorkflowFinished())
    // Check if workflow has been finished and mark it if it is. Also update cluster status
    // monitor if provided
    // Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
    // This is to handle TIMED_OUT only
    if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
        workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
      workflowCtx.setFinishTime(currentTime);
      updateWorkflowMonitor(workflowCtx, workflowCfg);
      _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
    }

    // Step 4: Handle finished workflows
    if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
      LOG.debug("Workflow {} is finished.", workflow);
      updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
      long expiryTime = workflowCfg.getExpiry();
      // Check if this workflow has been finished past its expiry.
      if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
        LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", workflow);
        cleanupWorkflow(workflow);
      } else {
        // schedule future cleanup work
        long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
        _rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
      }
      return;
    }

    if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
      Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
      jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
      if (jobWithFinalStates.size() > 0) {
        workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
        workflowCtx.removeJobStates(jobWithFinalStates);
        workflowCtx.removeJobStartTime(jobWithFinalStates);
      }
    }

    updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);

    // Step 5: handle workflow that should STOP
    // For workflows that have already reached final states, STOP should not take into effect.
    if (!TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())
        && TargetState.STOP.equals(targetState)) {
      if (isWorkflowStopped(workflowCtx, workflowCfg) && workflowCtx.getWorkflowState() != TaskState.STOPPED) {
        LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", workflow,
            workflowCtx.getWorkflowState());
        workflowCtx.setWorkflowState(TaskState.STOPPED);
        _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
      }
      return;
    }

    // Step 6: handle workflow that should go to IN_PROGRESS state after is has been resumed
    // This block is added to make sure workflow/queue context state becomes IN_PROGRESS for the
    // case where a queue which has been stopped before is just resumed and the queue does not
    // contain any job that needs to be run.
    if (targetState.equals(TargetState.START)
        && workflowCtx.getWorkflowState() == TaskState.STOPPED) {
      workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
    }

    _clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
  }