private void processForStepEntity()

in maestro-engine/src/main/java/com/netflix/maestro/engine/processors/StepInstanceWakeUpEventProcessor.java [74:152]


  private void processForStepEntity(StepInstanceWakeUpEvent jobEvent) {
    if (jobEvent.getStepAction() == Actions.StepInstanceAction.RESTART) {
      wakeupUnderlyingTask(jobEvent);
      return;
    }
    // handle the simple leaf step case.
    if (jobEvent.getStepType() != null && jobEvent.getStepType().isLeaf()) {
      if (jobEvent.getStepStatus() != null && jobEvent.getStepStatus().shouldWakeup()) {
        wakeupUnderlyingTask(jobEvent);
      }
      return;
    }

    // this is a non-leaf case.
    StepInstance stepInstance =
        stepInstanceDao.getStepInstance(
            jobEvent.getWorkflowId(),
            jobEvent.getWorkflowInstanceId(),
            jobEvent.getWorkflowRunId(),
            jobEvent.getStepId(),
            jobEvent.getStepAttemptId());
    if (stepInstance == null) {
      LOG.warn(
          "Action is requested on an invalid step instance. The requested action is: {}",
          jobEvent.getMessageKey());
      return;
    }
    if (stepInstance.getDefinition().getType().isLeaf()) {
      if (stepInstance.getRuntimeState().getStatus().shouldWakeup()) {
        wakeupUnderlyingTask(jobEvent);
      }
      return;
    }

    StepInstance.Status desiredStatus =
        Checks.notNull(
                jobEvent.getStepAction(),
                "the step action cannot be null for a step action job event {}",
                jobEvent.getMessageKey())
            .getStatus();
    if (stepInstance.getRuntimeState().getStatus() == desiredStatus) {
      return;
    }
    boolean stepTerminalCheck = false;
    if (stepInstance.getArtifacts() != null) {
      switch (stepInstance.getDefinition().getType()) {
        case FOREACH:
          if (stepInstance.getArtifacts().containsKey(Artifact.Type.FOREACH.key())) {
            ForeachArtifact foreachArtifact =
                stepInstance.getArtifacts().get(Artifact.Type.FOREACH.key()).asForeach();
            handleLeafTasksWakeup(
                jobEvent.getGroupInfo(), foreachArtifact.getForeachOverview().getOverallRollup());
            stepTerminalCheck = desiredStatus.isTerminal();
          }
          break;
        case SUBWORKFLOW:
          if (stepInstance.getArtifacts().containsKey(Artifact.Type.SUBWORKFLOW.key())) {
            SubworkflowArtifact subworkflowArtifact =
                stepInstance.getArtifacts().get(Artifact.Type.SUBWORKFLOW.key()).asSubworkflow();
            handleLeafTasksWakeup(
                jobEvent.getGroupInfo(),
                subworkflowArtifact.getSubworkflowOverview().getRollupOverview());
            stepTerminalCheck = desiredStatus.isTerminal();
          }
          break;
        default:
          LOG.warn(
              "Invalid step type to be processed for this action. Action is {} and the step type is: {}",
              jobEvent.getMessageKey(),
              stepInstance.getDefinition().getType());
          return;
      }
      // need to check the desired status again in the following retry.
      if (stepTerminalCheck) {
        throw new MaestroRetryableError(
            "Current status is not the desired status after action is taking. Will check again");
      }
    }
  }