private Optional savePendingActionIfFeasible()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [506:600]


  private Optional<TimelineEvent> savePendingActionIfFeasible(
      StepRuntimeSummary runtimeSummary, ForeachArtifact artifact) {
    if (runtimeSummary.getPendingAction() == null) {
      return Optional.empty();
    }

    StepAction action = runtimeSummary.getPendingAction();
    if (!action.isWorkflowAction()
        && action.getAction() == Actions.StepInstanceAction.RESTART // only support restart
        && action.getRestartConfig() != null) {
      List<RestartConfig.RestartNode> path = action.getRestartConfig().getRestartPath();
      if (path == null || path.size() <= 1) {
        LOG.info(
            "Get an invalid action path [{}] at foreach step [{}] and ignore it",
            path,
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      // pick up 2nd to the last restart node, which is the foreach iteration of foreach step
      RestartConfig.RestartNode restartNode = path.get(path.size() - 2);
      if (!Objects.equals(restartNode.getWorkflowId(), artifact.getForeachWorkflowId())) {
        LOG.warn(
            "Get an invalid action path node [{}] at foreach step [{}] and ignore it",
            restartNode,
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      // index is 0-based and iteration id is 1-based.
      if (artifact.getNextLoopIndex() < restartNode.getInstanceId()) {
        LOG.info(
            "Get an action while the current run has not yet looped over it at foreach step [{}] and ignore it",
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      if (artifact.getPendingAction() != null) {
        LOG.info(
            "Get an action while there is already an ongoing action at foreach step [{}] and ignore it",
            artifact.getForeachIdentity());
        return Optional.empty();
      }

      if (!artifact
          .getForeachOverview()
          .isForeachIterationRestartable(restartNode.getInstanceId())) {
        LOG.info(
            "Get an action while iteration [{}]'s status is unknown or not failed at foreach step [{}] and ignore it",
            restartNode.getInstanceId(),
            artifact.getForeachIdentity());
        return Optional.empty();
      }
      try {
        WorkflowInstance instance =
            instanceDao.getLatestWorkflowInstanceRun(
                restartNode.getWorkflowId(), restartNode.getInstanceId());

        // support restarting terminated iterations not in succeeded status
        if (!instance.getStatus().isTerminal()) {
          LOG.info(
              "Get an action while iteration [{}]'s status is not-actionable [{}] at foreach step [{}] and ignore it",
              restartNode.getInstanceId(),
              instance.getStatus(),
              artifact.getForeachIdentity());
          return Optional.empty();
        }

        artifact.setPendingAction(
            ForeachAction.builder()
                .instanceId(restartNode.getInstanceId())
                .instanceRunId(instance.getWorkflowRunId())
                .restartConfig(action.getRestartConfig())
                .action(action.getAction())
                .user(action.getUser())
                .createTime(action.getCreateTime())
                .build());
        return Optional.of(
            TimelineLogEvent.info(
                "Saved a pending action for iteration [%s]", restartNode.getInstanceId()));
      } catch (Exception e) {
        LOG.warn(
            "Failed to save an action [{}] at foreach step [{}] and will retry it",
            action,
            artifact.getForeachIdentity());
        return Optional.empty();
      }
    } else {
      LOG.info(
          "Get an unsupported action [{}] at foreach step [{}] and ignore it",
          action,
          artifact.getForeachIdentity());
      return Optional.empty();
    }
  }