in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [536:718]
protected void handleAdditionalTaskAssignment(
Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
JobContext jobCtx, final JobConfig jobCfg, final WorkflowConfig workflowConfig,
WorkflowContext workflowCtx, final WorkflowControllerDataProvider cache,
Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap,
Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {
// See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
boolean existsLiveInstanceOrCurrentStateOrMessageChangeChange =
cache.getExistsLiveInstanceOrCurrentStateOrMessageChange();
// The excludeSet contains the set of task partitions that must be excluded from consideration
// when making any new assignments.
// This includes all completed, failed, delayed, and already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet();
// Add all assigned partitions to excludeSet
for (Set<Integer> assignedSet : assignedPartitions.values()) {
excludeSet.addAll(assignedSet);
}
addCompletedTasks(excludeSet, jobCtx, allPartitions);
addPartitionsReachedMaximumRetries(excludeSet, jobCtx, allPartitions, jobCfg);
excludeSet.addAll(skippedPartitions);
Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
excludeSet.addAll(partitionsWithDelay);
// The following is filtering of tasks before passing them to the assigner
// Only feed in tasks that need to be assigned (have state equal to null, STOPPED, TIMED_OUT,
// TASK_ERROR, or DROPPED) or their assigned participant is disabled or not live anymore
Set<Integer> filteredTaskPartitionNumbers = filterTasks(jobResource, allPartitions, jobCtx,
liveInstances, cache.getDisabledInstances(), currStateOutput, paMap);
// Remove all excludeSet tasks to be safer because some STOPPED tasks have been already
// re-started (excludeSet includes already-assigned partitions). Also tasks with their retry
// limit exceed (addGiveupPartitions) will be removed as well
filteredTaskPartitionNumbers.removeAll(excludeSet);
Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
// If the job is a targeted job, in case of live instance change, we need to assign
// non-terminal tasks so that they could be re-scheduled
if (!TaskUtil.isGenericTaskJob(jobCfg)
&& existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
// This job is a targeted job, so FixedAssignmentCalculator will be used
// There has been a live instance change. Must re-add incomplete task partitions to be
// re-assigned and re-scheduled
for (int partitionNum : allPartitions) {
TaskPartitionState taskPartitionState = jobCtx.getPartitionState(partitionNum);
if (isTaskNotInTerminalState(taskPartitionState)
&& !partitionsWithDelay.contains(partitionNum)
&& !isTaskGivenup(jobCtx, jobCfg, partitionNum)) {
// Some targeted tasks may have timed-out due to Participants (instances) not being
// live, so we give tasks like these another try
// If some of these tasks are already scheduled and running, they will be dropped as
// well. Also, do not include partitions with delay that are not ready to be assigned and
// scheduled and the partitions that cannot be retried (given up)
partitionsToRetryOnLiveInstanceChangeForTargetedJob.add(partitionNum);
}
}
}
filteredTaskPartitionNumbers.addAll(partitionsToRetryOnLiveInstanceChangeForTargetedJob);
// The actual assignment is computed here
// Get instance->[partition, ...] mappings for the target resource.
Map<String, SortedSet<Integer>> tgtPartitionAssignments =
taskAssignmentCal.getTaskAssignment(currStateOutput, liveInstances, jobCfg, jobCtx,
workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());
if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) {
// TODO: Revisit the logic for isRebalanceRunningTask() and valid use cases for it
// TODO: isRebalanceRunningTask() was originally put in place to allow users to move
// ("rebalance") long-running tasks, but there hasn't been a clear use case for this
// Previously, there was a bug in the condition above (it was || where it should have been &&)
dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
jobCtx);
}
// If this is a targeted job and if there was a live instance change
if (!TaskUtil.isGenericTaskJob(jobCfg)
&& existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
// Drop current jobs only if they are assigned to a different instance, regardless of
// the jobCfg.isRebalanceRunningTask() setting
dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
jobCtx);
}
// Go through ALL instances and assign/throttle tasks accordingly
for (Map.Entry<String, SortedSet<Integer>> entry : currentInstanceToTaskAssignments.entrySet()) {
String instance = entry.getKey();
if (!tgtPartitionAssignments.containsKey(instance)) {
// There is no assignment made for this instance, so it is safe to skip
continue;
}
if (excludedInstances.contains(instance)) {
// There is a task assignment made for this instance, but for some reason, we cannot
// assign to this instance. So we must skip the actual scheduling, but we must also
// release the prematurely assigned tasks from AssignableInstance
if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
.containsKey(instance)) {
continue; // This should not happen; skip!
}
AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
String quotaType = jobCfg.getJobType();
for (int partitionNum : tgtPartitionAssignments.get(instance)) {
// Get the TaskConfig for this partitionNumber
String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
continue;
}
// 1. throttled by job configuration
// Contains the set of task partitions currently assigned to the instance.
int jobCfgLimitation =
jobCfg.getNumConcurrentTasksPerInstance() - assignedPartitions.get(instance).size();
// 2. throttled by participant capacity
int participantCapacity =
cache.getAssignableInstanceConfigMap().get(instance).getMaxConcurrentTask();
if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
}
int participantLimitation =
participantCapacity - cache.getParticipantActiveTaskCount(instance);
// New tasks to be assigned
int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
LOG.debug(
"Throttle tasks to be assigned to instance {} using limitation: Job Concurrent Task({}), "
+ "Participant Max Task({}). Remaining capacity {}.", instance, jobCfgLimitation,
participantCapacity, numToAssign);
Set<Integer> throttledSet = new HashSet<>();
if (numToAssign > 0) {
List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
excludeSet, throttledSet, numToAssign);
for (Integer pId : nextPartitions) {
// The following is the actual scheduling of the tasks
String pName = pName(jobResource, pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
jobCtx.setAssignedParticipant(pId, instance);
jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
final long currentTimestamp = System.currentTimeMillis();
jobCtx.setPartitionStartTime(pId, currentTimestamp);
if (jobCtx.getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
// This means this is the very first task scheduled for this job
jobCtx.setExecutionStartTime(currentTimestamp);
reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, workflowConfig, jobCfg,
currentTimestamp);
}
// Increment the task attempt count at schedule time
jobCtx.incrementNumAttempts(pId);
LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
jobCtx.getName(), pName, TaskPartitionState.RUNNING, instance);
}
cache.setParticipantActiveTaskCount(instance,
cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
} else {
// No assignment was actually scheduled, so this assignment needs to be released
// Put all assignments in throttledSet. Be sure to subtract excludeSet because excludeSet is
// already applied at filteringPartitions (excludeSet may contain partitions that are
// currently running)
Set<Integer> throttledSetWithExcludeSet =
new HashSet<>(tgtPartitionAssignments.get(instance));
throttledSetWithExcludeSet.removeAll(excludeSet); // Remove excludeSet
throttledSet.addAll(throttledSetWithExcludeSet);
}
if (!throttledSet.isEmpty()) {
// Release the tasks in throttledSet because they weren't actually assigned
if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
.containsKey(instance)) {
continue;
}
AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
String quotaType = jobCfg.getJobType();
for (int partitionNum : throttledSet) {
// Get the TaskConfig for this partitionNumber
String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
assignableInstanceManager.release(instance, taskConfig, quotaType);
}
LOG.debug(
"tasks for job {} are ready but throttled (size: {}) when assigned to participant.",
jobCfg.getJobId(), throttledSet.size());
}
}
}