public void updatePreviousAssignedTasksStatus()

in helix-core/src/main/java/org/apache/helix/task/AbstractTaskDispatcher.java [70:337]


  public void updatePreviousAssignedTasksStatus(
      Map<String, SortedSet<Integer>> currentInstanceToTaskAssignments,
      Set<String> excludedInstances, String jobResource, CurrentStateOutput currStateOutput,
      JobContext jobCtx, JobConfig jobCfg, TaskState jobState,
      Map<String, Set<Integer>> assignedPartitions, Set<Integer> partitionsToDropFromIs,
      Map<Integer, PartitionAssignment> paMap, TargetState jobTgtState,
      Set<Integer> skippedPartitions, WorkflowControllerDataProvider cache,
      Map<String, Set<Integer>> tasksToDrop) {

    // If a job is in one of the following states and its tasks are in RUNNING states, the tasks
    // will be aborted.
    Set<TaskState> jobStatesForAbortingTasks =
        new HashSet<>(Arrays.asList(TaskState.TIMING_OUT, TaskState.TIMED_OUT, TaskState.FAILING,
            TaskState.FAILED, TaskState.ABORTED));

    // Get AssignableInstanceMap for releasing resources for tasks in terminal states
    AssignableInstanceManager assignableInstanceManager = cache.getAssignableInstanceManager();

    Set<Integer> allTasksToDrop = new HashSet<>();
    for (Set<Integer> taskToDropForInstance: tasksToDrop.values()) {
      allTasksToDrop.addAll(taskToDropForInstance);
    }

    // Iterate through all instances
    for (String instance : currentInstanceToTaskAssignments.keySet()) {
      assignedPartitions.put(instance, new HashSet<>());

      // Set all dropping transitions first. These are tasks coming from Participant disconnects
      // and have the requestedState of DROPPED.
      // These need to be prioritized over any other state transitions because of the race condition
      // with the same pId (task) running on other instances. This is because in paMap, we can only
      // define one transition per pId
      if (tasksToDrop.containsKey(instance)) {
        for (int pIdToDrop : tasksToDrop.get(instance)) {
          paMap.put(pIdToDrop,
              new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          assignedPartitions.get(instance).add(pIdToDrop);
        }
      }

      if (excludedInstances.contains(instance)) {
        continue;
      }

      // If not an excluded instance, we must instantiate its entry in assignedPartitions
      Set<Integer> pSet = currentInstanceToTaskAssignments.get(instance);

      // We need to remove all task pId's to be dropped because we already made an assignment in
      // paMap above for them to be dropped. The following does this.
      pSet.removeAll(allTasksToDrop);

      // Used to keep track of partitions that are in either INIT or DROPPED states
      Set<Integer> donePartitions = new TreeSet<>();
      for (int pId : pSet) {
        final String pName = pName(jobResource, pId);
        TaskPartitionState currState = getTaskCurrentState(currStateOutput,
            jobResource, pId, pName, instance, jobCtx, jobTgtState);

        // Check for pending state transitions on this (partition, instance). If there is a pending
        // state transition, we prioritize this pending state transition and set the assignment from
        // this pending state transition, essentially "waiting" until this pending message clears
        // If there is a pending message, we should not continue to update the context because from
        // controller prospective, state transition has not been completed yet if pending message
        // still existed.
        // If context gets updated here, controller might remove the job from RunTimeJobDAG which
        // can cause the task's CurrentState not being removed when there is a pending message for
        // that task.
        Message pendingMessage =
            currStateOutput.getPendingMessage(jobResource, new Partition(pName), instance);
        if (pendingMessage != null) {
          processTaskWithPendingMessage(pId, pName, instance, pendingMessage, jobState, currState,
              paMap, assignedPartitions);
          continue;
        }

        // Update job context based on current state
        updatePartitionInformationInJobContext(currStateOutput, jobResource, currState, jobCtx,
            pId, pName, instance);

        if (!instance.equals(jobCtx.getAssignedParticipant(pId))) {
          LOG.warn(
              "Instance {} does not match the assigned participant for pId {} in the job context (job: {}). Skipping task scheduling.",
              instance, pId, jobCtx.getName());
          continue;
        }

        // Get AssignableInstance for this instance and TaskConfig for releasing resources
        String quotaType = jobCfg.getJobType();
        String taskId;
        if (TaskUtil.isGenericTaskJob(jobCfg)) {
          taskId = jobCtx.getTaskIdForPartition(pId);
        } else {
          taskId = pName;
        }
        TaskConfig taskConfig = jobCfg.getTaskConfig(taskId);

        // Process any requested state transitions. If there is a requested state transition, just
        // "wait" until this state transition is complete
        String requestedStateStr =
            currStateOutput.getRequestedState(jobResource, new Partition(pName), instance);
        if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
          TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
          if (requestedState.equals(currState)) {
            LOG.warn("Requested state {} is the same as the current state for instance {}.",
                requestedState, instance);
          }

          // For STOPPED tasks, if the targetState is STOP, we should not honor requestedState
          // transition and make it a NOP
          if (currState == TaskPartitionState.STOPPED && jobTgtState == TargetState.STOP) {
            // This task is STOPPED and not going to be re-run, so release this task
            assignableInstanceManager.release(instance, taskConfig, quotaType);
            continue;
          }

          // This contains check is necessary because we have already traversed pIdsToDrop at the
          // beginning of this method. If we already have a dropping transition, we do not want to
          // overwrite it. Any other requestedState transitions (for example, INIT to RUNNING or
          // RUNNING to COMPLETE, can wait without affecting correctness - they will be picked up
          // in ensuing runs of the Task pipeline)
          if (!paMap.containsKey(pId)) {
            paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
          }
          assignedPartitions.get(instance).add(pId);
          LOG.debug("Instance {} requested a state transition to {} for partition {}.", instance,
              requestedState, pName);
          continue;
        }

        switch (currState) {
        case RUNNING: {
          TaskPartitionState nextState = TaskPartitionState.RUNNING;
          if (jobStatesForAbortingTasks.contains(jobState)) {
            nextState = TaskPartitionState.TASK_ABORTED;
          } else if (jobTgtState == TargetState.STOP) {
            nextState = TaskPartitionState.STOPPED;
          }
          paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
          assignedPartitions.get(instance).add(pId);
          LOG.debug("Setting task partition {} state to {} on instance {}.", pName, nextState,
              instance);
        }
          break;
        case STOPPED: {
          // TODO: This case statement might be unreachable code - Hunter
          // This code may need to be removed because once a task is STOPPED and its workflow's
          // targetState is STOP, we do not assign that stopped task. Not assigning means it will
          // not be included in previousAssignment map in the next rebalance. If it is not in
          // prevInstanceToTaskAssignments, it will never hit this part of the code
          // When the parent workflow is to be resumed (target state is START), then it will just be
          // assigned as if it were being assigned for the first time
          TaskPartitionState nextState;
          if (jobTgtState.equals(TargetState.START)) {
            nextState = TaskPartitionState.RUNNING;
          } else {
            nextState = TaskPartitionState.STOPPED;
            // This task is STOPPED and not going to be re-run, so release this task
            assignableInstanceManager.release(instance, taskConfig, quotaType);
          }
          paMap.put(pId, new JobRebalancer.PartitionAssignment(instance, nextState.name()));
          assignedPartitions.get(instance).add(pId);

          LOG.debug("Setting job {} task partition {} state to {} on instance {}.",
              jobCtx.getName(), pName, nextState, instance);
        }
        break;
        case COMPLETED: {
          // The task has completed on this partition. Drop it from the instance and add it to assignedPartitions in
          // order to avoid scheduling it again in this pipeline.
          assignedPartitions.get(instance).add(pId);
          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          LOG.debug(
              "Task partition {} has completed with state {}. Marking as such in rebalancer context.",
              pName, currState);
          partitionsToDropFromIs.add(pId);
          // This task is COMPLETED, so release this task
          assignableInstanceManager.release(instance, taskConfig, quotaType);
        }
        break;
        case TIMED_OUT:

        case TASK_ERROR:

        case TASK_ABORTED:

        case ERROR: {
          // First make this task which is in terminal state to be dropped.
          // Later on, in next pipeline in handleAdditionalAssignments, the task will be retried if possible.
          // (meaning it is not ABORTED and max number of attempts has not been reached yet)
          assignedPartitions.get(instance).add(pId);
          paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));
          LOG.debug(
              "Task partition {} has error state {} with msg {}. Marking as such in rebalancer context.",
              pName, currState, jobCtx.getPartitionInfo(pId));
          // The error policy is to fail the task as soon a single partition fails for a specified
          // maximum number of attempts or task is in ABORTED state.
          // But notice that if job is TIMED_OUT, aborted task won't be treated as fail and won't
          // cause job fail.
          // After all tasks are aborted, they will be dropped, because of job timeout.
          if (jobState != TaskState.TIMED_OUT && jobState != TaskState.TIMING_OUT) {
            if (jobCtx.getPartitionNumAttempts(pId) >= jobCfg.getMaxAttemptsPerTask()
                || currState.equals(TaskPartitionState.TASK_ABORTED)
                || currState.equals(TaskPartitionState.ERROR)) {
              skippedPartitions.add(pId);
              partitionsToDropFromIs.add(pId);
              LOG.debug("skippedPartitions: {}", skippedPartitions);
            } else {
              // Mark the task to be started at some later time (if enabled)
              markPartitionDelayed(jobCfg, jobCtx, pId);
            }
          }
          // Release this task
          assignableInstanceManager.release(instance, taskConfig, quotaType);
        }
          break;
        case INIT: {
          // INIT is a temporary state for tasks
          // Two possible scenarios for INIT:
          // 1. Task is getting scheduled for the first time. In this case, Task's state will go
          // from null->INIT->RUNNING, and this INIT state will be transient and very short-lived
          // 2. Task is getting scheduled for the first time, but in this case, job is timed out or
          // timing out. In this case, it will be sent back to INIT state to be removed. Here we
          // ensure that this task then goes from INIT to DROPPED so that it will be released from
          // AssignableInstance to prevent resource leak
          if (jobState == TaskState.TIMED_OUT || jobState == TaskState.TIMING_OUT
              || jobTgtState == TargetState.DELETE) {
            // Job is timed out or timing out or targetState is to be deleted, so its tasks will be
            // sent back to INIT
            // In this case, tasks' IdealState will be removed, and they will be sent to DROPPED
            partitionsToDropFromIs.add(pId);

            assignedPartitions.get(instance).add(pId);
            paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.DROPPED.name()));

            // Also release resources for these tasks
            assignableInstanceManager.release(instance, taskConfig, quotaType);
            break;
          } else if (jobState == TaskState.IN_PROGRESS
              && (jobTgtState != TargetState.STOP && jobTgtState != TargetState.DELETE)) {
            // Job is in progress, implying that tasks are being re-tried, so set it to RUNNING
            paMap.put(pId,
                new JobRebalancer.PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
            assignedPartitions.get(instance).add(pId);
            break;
          }
        }

        case DROPPED: {
          // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
          donePartitions.add(pId);
          LOG.debug(
              "Task partition {} has state {}. It will be dropped from the current ideal state.",
              pName, currState);
          // If it's DROPPED, release this task. If INIT, do not release
          if (currState == TaskPartitionState.DROPPED) {
            assignableInstanceManager.release(instance, taskConfig, quotaType);
          }
        }
          break;
        default:
          throw new AssertionError("Unknown enum symbol: " + currState);
        }
      }

      // Remove the set of task partitions that are completed or in one of the error states.
      pSet.removeAll(donePartitions);
    }
  }