in helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java [63:171]
public void updateWorkflowStatus(String workflow, WorkflowConfig workflowCfg,
WorkflowContext workflowCtx, CurrentStateOutput currentStateOutput,
BestPossibleStateOutput bestPossibleOutput) {
// Fetch workflow configuration and context
if (workflowCfg == null) {
LOG.warn("Workflow configuration is NULL for {}", workflow);
return;
}
// Step 1: Check for deletion - if so, we don't need to go through further steps
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
if (targetState == TargetState.DELETE) {
LOG.debug("Workflow is marked as deleted {} cleaning up the workflow context.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
cleanupWorkflow(workflow);
return;
}
// Step 2: handle timeout, which should have higher priority than STOP
// Only generic workflow get timeouted and schedule rebalance for timeout. Will skip the set if
// the workflow already got timeouted. Job Queue will ignore the setup.
if (!workflowCfg.isJobQueue()
&& !TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())) {
// If timeout point has already been passed, it will not be scheduled
scheduleRebalanceForTimeout(workflow, workflowCtx.getStartTime(), workflowCfg.getTimeout());
if (!TaskState.TIMED_OUT.equals(workflowCtx.getWorkflowState())
&& isTimeout(workflowCtx.getStartTime(), workflowCfg.getTimeout())) {
workflowCtx.setWorkflowState(TaskState.TIMED_OUT);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
// We should not return after setting timeout, as in case the workflow is stopped already
// marking it timeout will not trigger rebalance pipeline as we are not listening on
// PropertyStore change, nor will we schedule rebalance for timeout as at this point,
// workflow is already timed-out. We should let the code proceed and wait for schedule
// future cleanup work
}
long currentTime = System.currentTimeMillis();
// Step 3: Check and process finished workflow context (confusing,
// but its inside isWorkflowFinished())
// Check if workflow has been finished and mark it if it is. Also update cluster status
// monitor if provided
// Note that COMPLETE and FAILED will be marked in markJobComplete / markJobFailed
// This is to handle TIMED_OUT only
if (workflowCtx.getFinishTime() == WorkflowContext.UNFINISHED && isWorkflowFinished(workflowCtx,
workflowCfg, _clusterDataCache.getJobConfigMap(), _clusterDataCache)) {
workflowCtx.setFinishTime(currentTime);
updateWorkflowMonitor(workflowCtx, workflowCfg);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
// Step 4: Handle finished workflows
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED) {
LOG.debug("Workflow {} is finished.", workflow);
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
long expiryTime = workflowCfg.getExpiry();
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() + expiryTime <= currentTime) {
LOG.info("Workflow {} passed expiry time, cleaning up the workflow context.", workflow);
cleanupWorkflow(workflow);
} else {
// schedule future cleanup work
long cleanupTime = workflowCtx.getFinishTime() + expiryTime;
_rebalanceScheduler.scheduleRebalance(_manager, workflow, cleanupTime);
}
return;
}
if (!workflowCfg.isTerminable() || workflowCfg.isJobQueue()) {
Set<String> jobWithFinalStates = new HashSet<>(workflowCtx.getJobStates().keySet());
jobWithFinalStates.removeAll(workflowCfg.getJobDag().getAllNodes());
if (jobWithFinalStates.size() > 0) {
workflowCtx.setLastJobPurgeTime(System.currentTimeMillis());
workflowCtx.removeJobStates(jobWithFinalStates);
workflowCtx.removeJobStartTime(jobWithFinalStates);
}
}
updateInflightJobs(workflow, workflowCtx, currentStateOutput, bestPossibleOutput);
// Step 5: handle workflow that should STOP
// For workflows that have already reached final states, STOP should not take into effect.
if (!TaskConstants.FINAL_STATES.contains(workflowCtx.getWorkflowState())
&& TargetState.STOP.equals(targetState)) {
if (isWorkflowStopped(workflowCtx, workflowCfg) && workflowCtx.getWorkflowState() != TaskState.STOPPED) {
LOG.debug("Workflow {} is marked as stopped. Workflow state is {}", workflow,
workflowCtx.getWorkflowState());
workflowCtx.setWorkflowState(TaskState.STOPPED);
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}
return;
}
// Step 6: handle workflow that should go to IN_PROGRESS state after is has been resumed
// This block is added to make sure workflow/queue context state becomes IN_PROGRESS for the
// case where a queue which has been stopped before is just resumed and the queue does not
// contain any job that needs to be run.
if (targetState.equals(TargetState.START)
&& workflowCtx.getWorkflowState() == TaskState.STOPPED) {
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
_clusterDataCache.updateWorkflowContext(workflow, workflowCtx);
}