in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [70:337]
public void updatePreviousAssignedTasksStatus(
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
JobContext jobCtx, JobConfig jobCfg, TaskState jobState,
Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
Map<String, Set<Integer>> tasksToDrop) {
// If a job is in one of the following states and its tasks are in RUNNING states, the tasks
// will be aborted.
Set<TaskState> jobStatesForAbortingTasks =
new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
TaskState.FAILED, TaskState.ABORTED));
// Get AssignableInstanceMap for releasing resources for tasks in terminal states
AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
Set<Integer> allTasksToDrop = new HashSet<>();
for (Set<Integer> taskToDropForInstance: tasksToDrop.values()) {
allTasksToDrop.addAll(taskToDropForInstance);
}
// Iterate through all instances
for (String instance : currentInstanceToTaskAssignments.keySet()) {
assignedPartitions.put(instance, new HashSet<>());
// Set all dropping transitions first. These are tasks coming from Participant disconnects
// and have the requestedState of DROPPED.
// These need to be prioritized over any other state transitions because of the race condition
// with the same pId (task) running on other instances. This is because in paMap, we can only
// define one transition per pId
if (tasksToDrop.containsKey(instance)) {
for (int pIdToDrop : tasksToDrop.get(instance)) {
paMap.put(pIdToDrop,
new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
assignedPartitions.get(instance).add(pIdToDrop);
}
}
if (excludedInstances.contains(instance)) {
continue;
}
// If not an excluded instance, we must instantiate its entry in assignedPartitions
Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);
// We need to remove all task pId's to be dropped because we already made an assignment in
// paMap above for them to be dropped. The following does this.
pSet.removeAll(allTasksToDrop);
// Used to keep track of partitions that are in either INIT or DROPPED states
Set<Integer> donePartitions = new TreeSet<>();
for (int pId : pSet) {
final String pName = pName(jobResource, pId);
TaskPartitionState currState = getTaskCurrentState(currStateOutput,
jobResource, pId, pName, instance, jobCtx, jobTgtState);
// Check for pending state transitions on this (partition, instance). If there is a pending
// state transition, we prioritize this pending state transition and set the assignment from
// this pending state transition, essentially "waiting" until this pending message clears
// If there is a pending message, we should not continue to update the context because from
// controller prospective, state transition has not been completed yet if pending message
// still existed.
// If context gets updated here, controller might remove the job from RunTimeJobDAG which
// can cause the task's CurrentState not being removed when there is a pending message for
// that task.
Message pendingMessage =
currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
if (pendingMessage != null) {
processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
paMap, assignedPartitions);
continue;
}
// Update job context based on current state
updatePartitionInformationInJobContext(currStateOutput, jobResource, currState, jobCtx,
pId, pName, instance);
if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
LOG.warn(
"Instance {} does not match the assigned participant for pId {} in the job context (job: {}). Skipping task scheduling.",
instance, pId, jobCtx.getName());
continue;
}
// Get AssignableInstance for this instance and TaskConfig for releasing resources
String quotaType = jobCfg.getJobType();
String taskId;
if (TaskUtil.isGenericTaskJob(jobCfg)) {
taskId = jobCtx.getTaskIdForPartition(pId);
} else {
taskId = pName;
}
TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
// Process any requested state transitions. If there is a requested state transition, just
// "wait" until this state transition is complete
String requestedStateStr =
currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
if (requestedState.equals(currState)) {
LOG.warn("Requested state {} is the same as the current state for instance {}.",
requestedState, instance);
}
// For STOPPED tasks, if the targetState is STOP, we should not honor requestedState
// transition and make it a NOP
if (currState == TaskPartitionState.STOPPED && jobTgtState == TargetState.STOP) {
// This task is STOPPED and not going to be re-run, so release this task
assignableInstanceManager.release(instance, taskConfig, quotaType);
continue;
}
// This contains check is necessary because we have already traversed pIdsToDrop at the
// beginning of this method. If we already have a dropping transition, we do not want to
// overwrite it. Any other requestedState transitions (for example, INIT to RUNNING or
// RUNNING to COMPLETE, can wait without affecting correctness - they will be picked up
// in ensuing runs of the Task pipeline)
if (!paMap.containsKey(pId)) {
paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
}
assignedPartitions.get(instance).add(pId);
LOG.debug("Instance {} requested a state transition to {} for partition {}.", instance,
requestedState, pName);
continue;
}
switch (currState) {
case RUNNING: {
TaskPartitionState nextState = TaskPartitionState.RUNNING;
if (jobStatesForAbortingTasks.contains(jobState)) {
nextState = TaskPartitionState.TASK_ABORTED;
} else if (jobTgtState == TargetState.STOP) {
nextState = TaskPartitionState.STOPPED;
}
paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
assignedPartitions.get(instance).add(pId);
LOG.debug("Setting task partition {} state to {} on instance {}.", pName, nextState,
instance);
}
break;
case STOPPED: {
// TODO: This case statement might be unreachable code - Hunter
// This code may need to be removed because once a task is STOPPED and its workflow's
// targetState is STOP, we do not assign that stopped task. Not assigning means it will
// not be included in previousAssignment map in the next rebalance. If it is not in
// prevInstanceToTaskAssignments, it will never hit this part of the code
// When the parent workflow is to be resumed (target state is START), then it will just be
// assigned as if it were being assigned for the first time
TaskPartitionState nextState;
if (jobTgtState.equals(TargetState.START)) {
nextState = TaskPartitionState.RUNNING;
} else {
nextState = TaskPartitionState.STOPPED;
// This task is STOPPED and not going to be re-run, so release this task
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
assignedPartitions.get(instance).add(pId);
LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
jobCtx.getName(), pName, nextState, instance);
}
break;
case COMPLETED: {
// The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
// order to avoid scheduling it again in this pipeline.
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
LOG.debug(
"Task partition {} has completed with state {}. Marking as such in rebalancer context.",
pName, currState);
partitionsToDropFromIs.add(pId);
// This task is COMPLETED, so release this task
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
break;
case TIMED_OUT:
case TASK_ERROR:
case TASK_ABORTED:
case ERROR: {
// First make this task which is in terminal state to be dropped.
// Later on, in next pipeline in handleAdditionalAssignments, the task will be retried if possible.
// (meaning it is not ABORTED and max number of attempts has not been reached yet)
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
LOG.debug(
"Task partition {} has error state {} with msg {}. Marking as such in rebalancer context.",
pName, currState, jobCtx.getPartitionInfo(pId));
// The error policy is to fail the task as soon a single partition fails for a specified
// maximum number of attempts or task is in ABORTED state.
// But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
// cause job fail.
// After all tasks are aborted, they will be dropped, because of job timeout.
if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
|| currState.equals(TaskPartitionState.TASK_ABORTED)
|| currState.equals(TaskPartitionState.ERROR)) {
skippedPartitions.add(pId);
partitionsToDropFromIs.add(pId);
LOG.debug("skippedPartitions: {}", skippedPartitions);
} else {
// Mark the task to be started at some later time (if enabled)
markPartitionDelayed(jobCfg, jobCtx, pId);
}
}
// Release this task
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
break;
case INIT: {
// INIT is a temporary state for tasks
// Two possible scenarios for INIT:
// 1. Task is getting scheduled for the first time. In this case, Task's state will go
// from null->INIT->RUNNING, and this INIT state will be transient and very short-lived
// 2. Task is getting scheduled for the first time, but in this case, job is timed out or
// timing out. In this case, it will be sent back to INIT state to be removed. Here we
// ensure that this task then goes from INIT to DROPPED so that it will be released from
// AssignableInstance to prevent resource leak
if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
|| jobTgtState == TargetState.DELETE) {
// Job is timed out or timing out or targetState is to be deleted, so its tasks will be
// sent back to INIT
// In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
partitionsToDropFromIs.add(pId);
assignedPartitions.get(instance).add(pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
// Also release resources for these tasks
assignableInstanceManager.release(instance, taskConfig, quotaType);
break;
} else if (jobState == TaskState.IN_PROGRESS
&& (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
// Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
paMap.put(pId,
new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
assignedPartitions.get(instance).add(pId);
break;
}
}
case DROPPED: {
// currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
donePartitions.add(pId);
LOG.debug(
"Task partition {} has state {}. It will be dropped from the current ideal state.",
pName, currState);
// If it's DROPPED, release this task. If INIT, do not release
if (currState == TaskPartitionState.DROPPED) {
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
}
break;
default:
throw new AssertionError("Unknown enum symbol: " + currState);
}
}
// Remove the set of task partitions that are completed or in one of the error states.
pSet.removeAll(donePartitions);
}
}