in helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java [199:333]
private ResourceAssignment computeResourceMapping(String jobResource,
WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
Collection<String> liveInstances, CurrentStateOutput currStateOutput,
WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
WorkflowControllerDataProvider cache) {
// Used to keep track of tasks that have already been assigned to instances.
// InstanceName -> Set of task partitions assigned to that instance in this iteration
Map<String, Set<Integer>> assignedPartitions = new HashMap<>();
// Used to keep track of tasks that have failed, but whose failure is acceptable
Set<Integer> skippedPartitions = new HashSet<>();
// Keeps a mapping of (partition) -> (instance, state)
Map<Integer, PartitionAssignment> paMap = new TreeMap<>();
Set<String> excludedInstances =
getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);
// Process all the current assignments of tasks.
TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache);
Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
workflowConfig, workflowCtx, cache.getIdealStates());
// Find the tasks that should be dropped either because task has been removed from config in
// generic jobs or target partition IS does not have the target partition anymore
Set<Integer> removedPartitions = taskAssignmentCal.getRemovedPartitions(jobCfg, jobCtx, allPartitions);
if (allPartitions == null || allPartitions.isEmpty()) {
// Empty target partitions, mark the job as FAILED.
String failureMsg =
"Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
LOG.info(failureMsg);
jobCtx.setInfo(failureMsg);
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
markAllPartitionsError(jobCtx);
return new ResourceAssignment(jobResource);
}
// This set contains all task pIds that need to be dropped because requestedState is DROPPED
// Newer versions of Participants, upon connection reset, sets task requestedStates to DROPPED
// These dropping transitions will be prioritized above all task state transition assignments
Map<String, Set<Integer>> tasksToDrop = new HashMap<>();
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(), currStateOutput,
jobResource, tasksToDrop);
updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);
handleDeletedTasks(jobResource, jobCtx, currentInstanceToTaskAssignments, tasksToDrop,
currStateOutput, allPartitions, removedPartitions);
long currentTime = System.currentTimeMillis();
LOG.debug("All partitions: {} taskAssignment: {} excludedInstances: {}", allPartitions,
currentInstanceToTaskAssignments, excludedInstances);
// Release resource for tasks in terminal state
updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);
addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);
if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
|| (jobCfg.getTargetResource() != null
&& cache.getIdealState(jobCfg.getTargetResource()) != null
&& !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
return buildEmptyAssignment(jobResource, currStateOutput);
}
workflowCtx.setJobState(jobResource, TaskState.FAILING);
// Drop all assigned but not given-up tasks
for (int pId : jobCtx.getPartitionSet()) {
String instance = jobCtx.getAssignedParticipant(pId);
if (jobCtx.getPartitionState(pId) != null && !isTaskGivenup(jobCtx, jobCfg, pId)) {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
}
Partition partition = new Partition(pName(jobResource, pId));
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, partition, instance);
// While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
// so that Helix will cancel the transition.
if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
}
}
return toResourceAssignment(jobResource, paMap);
}
if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) {
failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
return buildEmptyAssignment(jobResource, currStateOutput);
}
if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(),
cache);
_clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
jobCtx.getFinishTime() - jobCtx.getStartTime());
_rebalanceScheduler.removeScheduledRebalance(jobResource);
TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
// New pipeline trigger for workflow status update
// TODO: Enhance the pipeline and remove this because this operation is expansive
RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
return buildEmptyAssignment(jobResource, currStateOutput);
}
// If job is being timed out and no task is running (for whatever reason), idealState can be
// deleted and all tasks
// can be dropped(note that Helix doesn't track whether the drop is success or not).
if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
jobResource);
scheduleJobCleanUp(jobCfg.getTerminalStateExpiry(), workflowConfig, currentTime);
return buildEmptyAssignment(jobResource, currStateOutput);
}
// For delayed tasks, trigger a rebalance event for the closest upcoming ready time
scheduleForNextTask(jobResource, jobCtx, currentTime);
// Make additional task assignments if needed.
if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
&& jobTgtState == TargetState.START) {
handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, allPartitions,
currentTime, liveInstances);
}
return toResourceAssignment(jobResource, paMap);
}