protected void handleAdditionalTaskAssignment()

in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [536:718]


  protected void handleAdditionalTaskAssignment(
      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
      Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
      JobContext jobCtx, final JobConfig jobCfg, final WorkflowConfig workflowConfig,
      WorkflowContext workflowCtx, final WorkflowControllerDataProvider cache,
      Map<String, Set<Integer>> assignedPartitions, Map<Integer, PartitionAssignment> paMap,
      Set<Integer> skippedPartitions, TaskAssignmentCalculator taskAssignmentCal,
      Set<Integer> allPartitions, final long currentTime, Collection<String> liveInstances) {

    // See if there was LiveInstance change and cache LiveInstances from this iteration of pipeline
    boolean existsLiveInstanceOrCurrentStateOrMessageChangeChange =
        cache.getExistsLiveInstanceOrCurrentStateOrMessageChange();

    // The excludeSet contains the set of task partitions that must be excluded from consideration
    // when making any new assignments.
    // This includes all completed, failed, delayed, and already assigned partitions.
    Set<Integer> excludeSet = Sets.newTreeSet();
    // Add all assigned partitions to excludeSet
    for (Set<Integer> assignedSet : assignedPartitions.values()) {
      excludeSet.addAll(assignedSet);
    }
    addCompletedTasks(excludeSet, jobCtx, allPartitions);
    addPartitionsReachedMaximumRetries(excludeSet, jobCtx, allPartitions, jobCfg);
    excludeSet.addAll(skippedPartitions);
    Set<Integer> partitionsWithDelay = TaskUtil.getNonReadyPartitions(jobCtx, currentTime);
    excludeSet.addAll(partitionsWithDelay);

    // The following is filtering of tasks before passing them to the assigner
    // Only feed in tasks that need to be assigned (have state equal to null, STOPPED, TIMED_OUT,
    // TASK_ERROR, or DROPPED) or their assigned participant is disabled or not live anymore
    Set<Integer> filteredTaskPartitionNumbers = filterTasks(jobResource, allPartitions, jobCtx,
        liveInstances, cache.getDisabledInstances(), currStateOutput, paMap);
    // Remove all excludeSet tasks to be safer because some STOPPED tasks have been already
    // re-started (excludeSet includes already-assigned partitions). Also tasks with their retry
    // limit exceed (addGiveupPartitions) will be removed as well
    filteredTaskPartitionNumbers.removeAll(excludeSet);

    Set<Integer> partitionsToRetryOnLiveInstanceChangeForTargetedJob = new HashSet<>();
    // If the job is a targeted job, in case of live instance change, we need to assign
    // non-terminal tasks so that they could be re-scheduled
    if (!TaskUtil.isGenericTaskJob(jobCfg)
        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
      // This job is a targeted job, so FixedAssignmentCalculator will be used
      // There has been a live instance change. Must re-add incomplete task partitions to be
      // re-assigned and re-scheduled
      for (int partitionNum : allPartitions) {
        TaskPartitionState taskPartitionState = jobCtx.getPartitionState(partitionNum);
        if (isTaskNotInTerminalState(taskPartitionState)
            && !partitionsWithDelay.contains(partitionNum)
            && !isTaskGivenup(jobCtx, jobCfg, partitionNum)) {
          // Some targeted tasks may have timed-out due to Participants (instances) not being
          // live, so we give tasks like these another try
          // If some of these tasks are already scheduled and running, they will be dropped as
          // well. Also, do not include partitions with delay that are not ready to be assigned and
          // scheduled and the partitions that cannot be retried (given up)
          partitionsToRetryOnLiveInstanceChangeForTargetedJob.add(partitionNum);
        }
      }
    }
    filteredTaskPartitionNumbers.addAll(partitionsToRetryOnLiveInstanceChangeForTargetedJob);

    // The actual assignment is computed here
    // Get instance->[partition, ...] mappings for the target resource.
    Map<String, SortedSet<Integer>> tgtPartitionAssignments =
        taskAssignmentCal.getTaskAssignment(currStateOutput, liveInstances, jobCfg, jobCtx,
            workflowConfig, workflowCtx, filteredTaskPartitionNumbers, cache.getIdealStates());

    if (!TaskUtil.isGenericTaskJob(jobCfg) && jobCfg.isRebalanceRunningTask()) {
      // TODO: Revisit the logic for isRebalanceRunningTask() and valid use cases for it
      // TODO: isRebalanceRunningTask() was originally put in place to allow users to move
      // ("rebalance") long-running tasks, but there hasn't been a clear use case for this
      // Previously, there was a bug in the condition above (it was || where it should have been &&)
      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
          jobCtx);
    }

    // If this is a targeted job and if there was a live instance change
    if (!TaskUtil.isGenericTaskJob(jobCfg)
        && existsLiveInstanceOrCurrentStateOrMessageChangeChange) {
      // Drop current jobs only if they are assigned to a different instance, regardless of
      // the jobCfg.isRebalanceRunningTask() setting
      dropRebalancedRunningTasks(tgtPartitionAssignments, currentInstanceToTaskAssignments, paMap,
          jobCtx);
    }
    // Go through ALL instances and assign/throttle tasks accordingly
    for (Map.Entry<String, SortedSet<Integer>> entry : currentInstanceToTaskAssignments.entrySet()) {
      String instance = entry.getKey();
      if (!tgtPartitionAssignments.containsKey(instance)) {
        // There is no assignment made for this instance, so it is safe to skip
        continue;
      }
      if (excludedInstances.contains(instance)) {
        // There is a task assignment made for this instance, but for some reason, we cannot
        // assign to this instance. So we must skip the actual scheduling, but we must also
        // release the prematurely assigned tasks from AssignableInstance
        if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
            .containsKey(instance)) {
          continue; // This should not happen; skip!
        }
        AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
        String quotaType = jobCfg.getJobType();
        for (int partitionNum : tgtPartitionAssignments.get(instance)) {
          // Get the TaskConfig for this partitionNumber
          String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
          TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
          assignableInstanceManager.release(instance, taskConfig, quotaType);
        }
        continue;
      }
      // 1. throttled by job configuration
      // Contains the set of task partitions currently assigned to the instance.
      int jobCfgLimitation =
          jobCfg.getNumConcurrentTasksPerInstance() - assignedPartitions.get(instance).size();
      // 2. throttled by participant capacity
      int participantCapacity =
          cache.getAssignableInstanceConfigMap().get(instance).getMaxConcurrentTask();
      if (participantCapacity == InstanceConfig.MAX_CONCURRENT_TASK_NOT_SET) {
        participantCapacity = cache.getClusterConfig().getMaxConcurrentTaskPerInstance();
      }
      int participantLimitation =
          participantCapacity - cache.getParticipantActiveTaskCount(instance);
      // New tasks to be assigned
      int numToAssign = Math.min(jobCfgLimitation, participantLimitation);
      LOG.debug(
          "Throttle tasks to be assigned to instance {} using limitation: Job Concurrent Task({}), "
              + "Participant Max Task({}). Remaining capacity {}.", instance, jobCfgLimitation,
          participantCapacity, numToAssign);
      Set<Integer> throttledSet = new HashSet<>();
      if (numToAssign > 0) {
        List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
            excludeSet, throttledSet, numToAssign);
        for (Integer pId : nextPartitions) {
          // The following is the actual scheduling of the tasks
          String pName = pName(jobResource, pId);
          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
          excludeSet.add(pId);
          jobCtx.setAssignedParticipant(pId, instance);
          jobCtx.setPartitionState(pId, TaskPartitionState.INIT);
          final long currentTimestamp = System.currentTimeMillis();
          jobCtx.setPartitionStartTime(pId, currentTimestamp);
          if (jobCtx.getExecutionStartTime() == WorkflowContext.NOT_STARTED) {
            // This means this is the very first task scheduled for this job
            jobCtx.setExecutionStartTime(currentTimestamp);
            reportSubmissionToScheduleDelay(cache, _clusterStatusMonitor, workflowConfig, jobCfg,
                currentTimestamp);
          }
          // Increment the task attempt count at schedule time
          jobCtx.incrementNumAttempts(pId);
          LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
              jobCtx.getName(), pName, TaskPartitionState.RUNNING, instance);
        }
        cache.setParticipantActiveTaskCount(instance,
            cache.getParticipantActiveTaskCount(instance) + nextPartitions.size());
      } else {
        // No assignment was actually scheduled, so this assignment needs to be released
        // Put all assignments in throttledSet. Be sure to subtract excludeSet because excludeSet is
        // already applied at filteringPartitions (excludeSet may contain partitions that are
        // currently running)
        Set<Integer> throttledSetWithExcludeSet =
            new HashSet<>(tgtPartitionAssignments.get(instance));
        throttledSetWithExcludeSet.removeAll(excludeSet); // Remove excludeSet
        throttledSet.addAll(throttledSetWithExcludeSet);
      }
      if (!throttledSet.isEmpty()) {
        // Release the tasks in throttledSet because they weren't actually assigned
        if (!cache.getAssignableInstanceManager().getAssignableInstanceMap()
            .containsKey(instance)) {
          continue;
        }
        AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();
        String quotaType = jobCfg.getJobType();
        for (int partitionNum : throttledSet) {
          // Get the TaskConfig for this partitionNumber
          String taskId = getTaskId(jobCfg, jobCtx, partitionNum);
          TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);
          assignableInstanceManager.release(instance, taskConfig, quotaType);
        }
        LOG.debug(
            "tasks for job {} are ready but throttled (size: {}) when assigned to participant.",
            jobCfg.getJobId(), throttledSet.size());
      }
    }
  }