private Optional takePendingActionIfFeasible()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/ForeachStepRuntime.java [615:704]


  private Optional<TimelineEvent> takePendingActionIfFeasible(
      WorkflowSummary workflowSummary,
      ForeachStep foreachStep,
      StepRuntimeSummary runtimeSummary,
      ForeachArtifact artifact) {
    if (artifact.getPendingAction() == null) {
      return Optional.empty();
    }

    ForeachAction foreachAction = artifact.getPendingAction();
    try {
      WorkflowInstance instance =
          instanceDao.getWorkflowInstanceRun(
              artifact.getForeachWorkflowId(),
              foreachAction.getInstanceId(),
              foreachAction.getInstanceRunId());
      if ((foreachAction.getInstanceRunId() == Constants.LATEST_ONE
              || instance.getWorkflowRunId() == foreachAction.getInstanceRunId())
          && instance.getStatus().isTerminal()) {
        int restartIterationId = (int) foreachAction.getInstanceId();
        ForeachArtifact restartArtifact = new ForeachArtifact();
        restartArtifact.setForeachWorkflowId(artifact.getForeachWorkflowId());
        restartArtifact.setForeachRunId(
            Math.max(artifact.getForeachRunId(), foreachAction.getInstanceRunId()) + 1);

        RunRequest runRequest =
            StepHelper.createInternalWorkflowRunRequest(
                    workflowSummary,
                    runtimeSummary,
                    Collections.singletonList(Tag.create(FOREACH_TAG_NAME)),
                    createForeachRunParams(
                        restartIterationId - 1, workflowSummary, foreachStep, runtimeSummary),
                    generateDedupKey(restartArtifact, restartIterationId - 1))
                .toBuilder()
                .restartConfig(foreachAction.getRestartConfig())
                .build();

        WorkflowInstance.Status oldStatus = instance.getStatus();
        WorkflowRollupOverview oldOverview =
            instance.getRuntimeOverview() == null
                ? null
                : instance.getRuntimeOverview().getRollupOverview();

        Optional<Details> result =
            actionHandler.restartForeachInstance(
                runRequest, instance, foreachStep.getId(), restartArtifact.getForeachRunId());
        if (result.isPresent()) {
          LOG.info(
              "Failed to take the action for foreach step {}{} due to [{}] and will retry it later",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity(),
              result.get());
          return result.map(TimelineDetailsEvent::from);
        } else {
          LOG.info(
              "Successfully took the pending action for foreach step {}{} and will update artifact together",
              workflowSummary.getIdentity(),
              runtimeSummary.getIdentity());
          artifact
              .getForeachOverview()
              .updateForRestart(
                  foreachAction.getInstanceId(), instance.getStatus(), oldStatus, oldOverview);
          artifact.setPendingAction(null);
          return Optional.of(
              TimelineLogEvent.info("Successfully take an action: [%s]", foreachAction));
        }
      } else {
        LOG.info(
            "Got an out-dated pending action [{}] for foreach step {}{} and ignore it...",
            foreachAction,
            workflowSummary.getIdentity(),
            runtimeSummary.getIdentity());
        artifact.setPendingAction(null);
        return Optional.empty();
      }
    } catch (Exception e) {
      LOG.warn(
          "Fatally failed to take the action for foreach step {}{} due to [{}] and will not retry it",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          e.getMessage());
      artifact.setPendingAction(null);
      return Optional.of(
          TimelineDetailsEvent.from(
              Details.create(
                  e,
                  false,
                  "Failed to take the action for foreach step with an error and won't retry")));
    }
  }