private ResourceAssignment computeResourceMapping()

in helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java [199:333]


  private ResourceAssignment computeResourceMapping(String jobResource,
      WorkflowConfig workflowConfig, JobConfig jobCfg, TaskState jobState, TargetState jobTgtState,
      Collection<String> liveInstances, CurrentStateOutput currStateOutput,
      WorkflowContext workflowCtx, JobContext jobCtx, Set<Integer> partitionsToDropFromIs,
      WorkflowControllerDataProvider cache) {
    // Used to keep track of tasks that have already been assigned to instances.
    // InstanceName -> Set of task partitions assigned to that instance in this iteration
    Map<String, Set<Integer>> assignedPartitions = new HashMap<>();

    // Used to keep track of tasks that have failed, but whose failure is acceptable
    Set<Integer> skippedPartitions = new HashSet<>();

    // Keeps a mapping of (partition) -> (instance, state)
    Map<Integer, PartitionAssignment> paMap = new TreeMap<>();

    Set<String> excludedInstances =
        getExcludedInstances(jobResource, workflowConfig, workflowCtx, cache);

    // Process all the current assignments of tasks.
    TaskAssignmentCalculator taskAssignmentCal = getAssignmentCalculator(jobCfg, cache);
    Set<Integer> allPartitions = taskAssignmentCal.getAllTaskPartitions(jobCfg, jobCtx,
        workflowConfig, workflowCtx, cache.getIdealStates());

    // Find the tasks that should be dropped either because task has been removed from config in
    // generic jobs or target partition IS does not have the target partition anymore
    Set<Integer> removedPartitions = taskAssignmentCal.getRemovedPartitions(jobCfg, jobCtx, allPartitions);

    if (allPartitions == null || allPartitions.isEmpty()) {
      // Empty target partitions, mark the job as FAILED.
      String failureMsg =
          "Empty task partition mapping for job " + jobResource + ", marked the job as FAILED!";
      LOG.info(failureMsg);
      jobCtx.setInfo(failureMsg);
      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
      markAllPartitionsError(jobCtx);
      return new ResourceAssignment(jobResource);
    }

    // This set contains all task pIds that need to be dropped because requestedState is DROPPED
    // Newer versions of Participants, upon connection reset, sets task requestedStates to DROPPED
    // These dropping transitions will be prioritized above all task state transition assignments
    Map<String, Set<Integer>> tasksToDrop = new HashMap<>();

    Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments =
        getCurrentInstanceToTaskAssignments(cache.getEnabledLiveInstances(), currStateOutput,
            jobResource, tasksToDrop);

    updateInstanceToTaskAssignmentsFromContext(jobCtx, currentInstanceToTaskAssignments);

    handleDeletedTasks(jobResource, jobCtx, currentInstanceToTaskAssignments, tasksToDrop,
        currStateOutput, allPartitions, removedPartitions);

    long currentTime = System.currentTimeMillis();

    LOG.debug("All partitions: {} taskAssignment: {} excludedInstances: {}", allPartitions,
        currentInstanceToTaskAssignments, excludedInstances);

    // Release resource for tasks in terminal state
    updatePreviousAssignedTasksStatus(currentInstanceToTaskAssignments, excludedInstances,
        jobResource, currStateOutput, jobCtx, jobCfg, jobState, assignedPartitions,
        partitionsToDropFromIs, paMap, jobTgtState, skippedPartitions, cache, tasksToDrop);

    addGivenUpPartitions(skippedPartitions, jobCtx, allPartitions, jobCfg);

    if (jobState == TaskState.IN_PROGRESS && skippedPartitions.size() > jobCfg.getFailureThreshold()
        || (jobCfg.getTargetResource() != null
            && cache.getIdealState(jobCfg.getTargetResource()) != null
            && !cache.getIdealState(jobCfg.getTargetResource()).isEnabled())) {
      if (isJobFinished(jobCtx, jobResource, currStateOutput)) {
        failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
        return buildEmptyAssignment(jobResource, currStateOutput);
      }
      workflowCtx.setJobState(jobResource, TaskState.FAILING);
      // Drop all assigned but not given-up tasks
      for (int pId : jobCtx.getPartitionSet()) {
        String instance = jobCtx.getAssignedParticipant(pId);
        if (jobCtx.getPartitionState(pId) != null && !isTaskGivenup(jobCtx, jobCfg, pId)) {
          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.TASK_ABORTED.name()));
        }
        Partition partition = new Partition(pName(jobResource, pId));
        Message pendingMessage =
            currStateOutput.getPendingMessage(jobResource, partition, instance);
        // While job is failing, if the task is pending on INIT->RUNNING, set it back to INIT,
        // so that Helix will cancel the transition.
        if (jobCtx.getPartitionState(pId) == TaskPartitionState.INIT && pendingMessage != null) {
          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.INIT.name()));
        }
      }

      return toResourceAssignment(jobResource, paMap);
    }

    if (jobState == TaskState.FAILING && isJobFinished(jobCtx, jobResource, currStateOutput)) {
      failJob(jobResource, workflowCtx, jobCtx, workflowConfig, cache.getJobConfigMap(), cache);
      return buildEmptyAssignment(jobResource, currStateOutput);
    }

    if (isJobComplete(jobCtx, allPartitions, jobCfg)) {
      markJobComplete(jobResource, jobCtx, workflowConfig, workflowCtx, cache.getJobConfigMap(),
          cache);
      _clusterStatusMonitor.updateJobCounters(jobCfg, TaskState.COMPLETED,
          jobCtx.getFinishTime() - jobCtx.getStartTime());
      _rebalanceScheduler.removeScheduledRebalance(jobResource);
      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobResource);
      // New pipeline trigger for workflow status update
      // TODO: Enhance the pipeline and remove this because this operation is expansive
      RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
      return buildEmptyAssignment(jobResource, currStateOutput);
    }

    // If job is being timed out and no task is running (for whatever reason), idealState can be
    // deleted and all tasks
    // can be dropped(note that Helix doesn't track whether the drop is success or not).
    if (jobState == TaskState.TIMING_OUT && isJobFinished(jobCtx, jobResource, currStateOutput)) {
      handleJobTimeout(jobCtx, workflowCtx, jobResource, jobCfg);
      finishJobInRuntimeJobDag(cache.getTaskDataCache(), workflowConfig.getWorkflowId(),
          jobResource);
      scheduleJobCleanUp(jobCfg.getTerminalStateExpiry(), workflowConfig, currentTime);
      return buildEmptyAssignment(jobResource, currStateOutput);
    }

    // For delayed tasks, trigger a rebalance event for the closest upcoming ready time
    scheduleForNextTask(jobResource, jobCtx, currentTime);

    // Make additional task assignments if needed.
    if (jobState != TaskState.TIMING_OUT && jobState != TaskState.TIMED_OUT
        && jobTgtState == TargetState.START) {
      handleAdditionalTaskAssignment(currentInstanceToTaskAssignments, excludedInstances,
          jobResource, currStateOutput, jobCtx, jobCfg, workflowConfig, workflowCtx, cache,
          assignedPartitions, paMap, skippedPartitions, taskAssignmentCal, allPartitions,
          currentTime, liveInstances);
    }

    return toResourceAssignment(jobResource, paMap);
  }