public ResourceAssignment processJobStatusUpdateAndAssignment()

in helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java [56:197]


  public ResourceAssignment processJobStatusUpdateAndAssignment(String jobName,
      CurrentStateOutput currStateOutput, WorkflowContext workflowCtx) {
    // Fetch job configuration
    final JobConfig jobCfg = _dataProvider.getJobConfig(jobName);
    if (jobCfg == null) {
      LOG.error("Job configuration is NULL for {}", jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }
    String workflowResource = jobCfg.getWorkflow();

    // Fetch workflow configuration and context
    final WorkflowConfig workflowCfg = _dataProvider.getWorkflowConfig(workflowResource);
    if (workflowCfg == null) {
      LOG.error("Workflow configuration is NULL for {}", jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    if (workflowCtx == null) {
      LOG.error("Workflow context is NULL for {}", jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    TargetState targetState = workflowCfg.getTargetState();
    if (targetState != TargetState.START && targetState != TargetState.STOP) {
      LOG.info("Target state is {} for workflow {} .Stop scheduling job {}", targetState.name(),
          workflowResource, jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    // Stop current run of the job if workflow or job is already in final state (failed or
    // completed)
    TaskState workflowState = workflowCtx.getWorkflowState();
    TaskState jobState = workflowCtx.getJobState(jobName);
    // Do not include workflowState == TIMED_OUT here, as later logic needs to handle this case
    if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED
        || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED
        || jobState == TaskState.TIMED_OUT) {
      LOG.info(
          "Workflow {} or job {} is already in final state, workflow state ({}), job state ({}), clean up job IS.",
          workflowResource, jobName, workflowState, jobState);
      finishJobInRuntimeJobDag(_dataProvider.getTaskDataCache(), workflowResource, jobName);
      TaskUtil.cleanupJobIdealStateExtView(_manager.getHelixDataAccessor(), jobName);
      // New pipeline trigger for workflow status update
      // TODO: Enhance the pipeline and remove this because this operation is expansive
      RebalanceUtil.scheduleOnDemandPipeline(_manager.getClusterName(),0L,false);
      _rebalanceScheduler.removeScheduledRebalance(jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    if (!isWorkflowReadyForSchedule(workflowCfg)) {
      LOG.info("Job {} is not ready to be run since workflow is not ready.", jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    if (!TaskUtil.isJobStarted(jobName, workflowCtx) && !isJobReadyToSchedule(jobName, workflowCfg,
        workflowCtx, TaskUtil.getInCompleteJobCount(workflowCfg, workflowCtx),
        _dataProvider.getJobConfigMap(), _dataProvider,
        _dataProvider.getAssignableInstanceManager())) {
      LOG.info("Job {} is not ready to run.", jobName);
      return buildEmptyAssignment(jobName, currStateOutput);
    }

    // Fetch any existing context information from the property store.
    JobContext jobCtx = _dataProvider.getJobContext(jobName);
    if (jobCtx == null) {
      jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
      final long currentTimestamp = System.currentTimeMillis();
      jobCtx.setStartTime(currentTimestamp);
      jobCtx.setName(jobName);
      // This job's JobContext has not been created yet. Since we are creating a new JobContext
      // here, we must also create its UserContentStore
      TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName,
          new ZNRecord(TaskUtil.USER_CONTENT_NODE));
      workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);

      // Since this job has been processed for the first time, we report SubmissionToProcessDelay
      // here asynchronously
      reportSubmissionToProcessDelay(_dataProvider, _clusterStatusMonitor, workflowCfg, jobCfg,
          currentTimestamp);
    }

    if (!TaskState.TIMED_OUT.equals(workflowCtx.getJobState(jobName))) {
      scheduleRebalanceForTimeout(jobCfg.getJobId(), jobCtx.getStartTime(), jobCfg.getTimeout());
    }

    // Will contain the list of partitions that must be explicitly dropped from the ideal state that
    // is stored in zk.
    Set<String> liveInstances =
        jobCfg.getInstanceGroupTag() == null ? _dataProvider.getEnabledLiveInstances()
            : _dataProvider.getEnabledLiveInstancesWithTag(jobCfg.getInstanceGroupTag());

    if (liveInstances.isEmpty()) {
      LOG.error("No available instance found for job: {}", jobName);
    }

    TargetState jobTgtState = workflowCfg.getTargetState();
    jobState = workflowCtx.getJobState(jobName);
    workflowState = workflowCtx.getWorkflowState();

    if (INTERMEDIATE_STATES.contains(jobState)
        && (isTimeout(jobCtx.getStartTime(), jobCfg.getTimeout())
            || TaskState.TIMED_OUT.equals(workflowState))) {
      jobState = TaskState.TIMING_OUT;
      workflowCtx.setJobState(jobName, TaskState.TIMING_OUT);
    } else if (jobState != TaskState.TIMING_OUT && jobState != TaskState.FAILING) {
      // TIMING_OUT/FAILING/ABORTING job can't be stopped, because all tasks are being aborted
      // Update running status in workflow context
      if (jobTgtState == TargetState.STOP) {
        // If the assigned instance is no longer live, so mark it as DROPPED in the context
        markPartitionsWithoutLiveInstance(jobCtx, liveInstances);

        if (jobState != TaskState.NOT_STARTED && TaskUtil.checkJobStopped(jobCtx)) {
          workflowCtx.setJobState(jobName, TaskState.STOPPED);
        } else {
          workflowCtx.setJobState(jobName, TaskState.STOPPING);
        }
        // Workflow has been stopped if all in progress jobs are stopped
        if (isWorkflowStopped(workflowCtx, workflowCfg)) {
          workflowCtx.setWorkflowState(TaskState.STOPPED);
        } else {
          workflowCtx.setWorkflowState(TaskState.STOPPING);
        }
      } else {
        workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS);
        // Workflow is in progress if any task is in progress
        workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
      }
    }

    Set<Integer> partitionsToDrop = new TreeSet<>();
    ResourceAssignment newAssignment =
        computeResourceMapping(jobName, workflowCfg, jobCfg, jobState, jobTgtState, liveInstances,
            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, _dataProvider);

    // Update Workflow and Job context in data cache and ZK.
    _dataProvider.updateJobContext(jobName, jobCtx);
    _dataProvider.updateWorkflowContext(workflowResource, workflowCtx);

    LOG.debug("Job {} new assignment",
        Arrays.toString(newAssignment.getMappedPartitions().toArray()));
    return newAssignment;
  }