in helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java [56:197]
public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
// Fetch job configuration
final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
if (jobCfg == null) {
LOG.error("Job configuration is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
String workflowResource = jobCfg.getWorkflow();
// Fetch workflow configuration and context
final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
if (workflowCfg == null) {
LOG.error("Workflow configuration is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (workflowCtx == null) {
LOG.error("Workflow context is NULL for {}", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
TargetState targetState = workflowCfg.getTargetState();
if (targetState != TargetState.START && targetState != TargetState.STOP) {
LOG.info("Target state is {} for workflow {} .Stop scheduling job {}", targetState.name(),
workflowResource, jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
// Stop current run of the job if workflow or job is already in final state (failed or
// completed)
TaskState workflowState = workflowCtx.getWorkflowState();
TaskState jobState = workflowCtx.getJobState(jobName);
// Do not include workflowState == TIMED_OUT here, as later logic needs to handle this case
if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
|| jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
|| jobState == TaskState.TIMED_OUT) {
LOG.info(
"Workflow {} or job {} is already in final state, workflow state ({}), job state ({}), clean up job IS.",
workflowResource, jobName, workflowState, jobState);
finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
// New pipeline trigger for workflow status update
// TODO: Enhance the pipeline and remove this because this operation is expansive
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
_rebalanceScheduler.removeScheduledRebalance(jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (!isWorkflowReadyForSchedule(workflowCfg)) {
LOG.info("Job {} is not ready to be run since workflow is not ready.", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
_dataProvider.getJobConfigMap(), _dataProvider,
_dataProvider.getAssignableInstanceManager())) {
LOG.info("Job {} is not ready to run.", jobName);
return buildEmptyAssignment(jobName, currStateOutput);
}
// Fetch any existing context information from the property store.
JobContext jobCtx = _dataProvider.getJobContext(jobName);
if (jobCtx == null) {
jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
final long currentTimestamp = System.currentTimeMillis();
jobCtx.setStartTime(currentTimestamp);
jobCtx.setName(jobName);
// This job's JobContext has not been created yet. Since we are creating a new JobContext
// here, we must also create its UserContentStore
TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
new ZNRecord(TaskUtil.USER_CONTENT_NODE));
workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
// Since this job has been processed for the first time, we report SubmissionToProcessDelay
// here asynchronously
reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg,
currentTimestamp);
}
if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
}
// Will contain the list of partitions that must be explicitly dropped from the ideal state that
// is stored in zk.
Set<String> liveInstances =
jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
: _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());
if (liveInstances.isEmpty()) {
LOG.error("No available instance found for job: {}", jobName);
}
TargetState jobTgtState = workflowCfg.getTargetState();
jobState = workflowCtx.getJobState(jobName);
workflowState = workflowCtx.getWorkflowState();
if (INTERMEDIATE_STATES.contains(jobState)
&& (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
|| TaskState.TIMED_OUT.equals(workflowState))) {
jobState = TaskState.TIMING_OUT;
workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
} else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
// TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
// Update running status in workflow context
if (jobTgtState == TargetState.STOP) {
// If the assigned instance is no longer live, so mark it as DROPPED in the context
markPartitionsWithoutLiveInstance(jobCtx, liveInstances);
if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
workflowCtx.setJobState(jobName, TaskState.STOPPED);
} else {
workflowCtx.setJobState(jobName, TaskState.STOPPING);
}
// Workflow has been stopped if all in progress jobs are stopped
if (isWorkflowStopped(workflowCtx, workflowCfg)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
} else {
workflowCtx.setWorkflowState(TaskState.STOPPING);
}
} else {
workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
}
}
Set<Integer> partitionsToDrop = new TreeSet<>();
ResourceAssignment newAssignment =
computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, liveInstances,
currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);
// Update Workflow and Job context in data cache and ZK.
_dataProvider.updateJobContext(jobName, jobCtx);
_dataProvider.updateWorkflowContext(workflowResource, workflowCtx);
LOG.debug("Job {} new assignment",
Arrays.toString(newAssignment.getMappedPartitions().toArray()));
return newAssignment;
}