private Result runSubworkflowInstance()

in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/SubworkflowStepRuntime.java [88:190]


  private Result runSubworkflowInstance(
      WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
    RunRequest runRequest =
        StepHelper.createInternalWorkflowRunRequest(
            workflowSummary,
            runtimeSummary,
            Collections.singletonList(Tag.create(SUBWORKFLOW_TAG_NAME)),
            createSubworkflowRunParam(workflowSummary, step, runtimeSummary),
            workflowSummary.getIdentity() + runtimeSummary.getIdentity());

    if (!instanceStepConcurrencyHandler.addInstance(runRequest)) {
      return new Result(
          State.CONTINUE,
          Collections.emptyMap(),
          Collections.singletonList(
              TimelineLogEvent.info(
                  "Unavailable due to InstanceStepConcurrency and will retry later to launch the subworkflow")));
    }

    try {
      RunResponse runResponse = null;
      // restart from workflow or restart the step
      if (!workflowSummary.isFreshRun() || !runtimeSummary.getStepRetry().isRetryable()) {
        SubworkflowArtifact subworkflowArtifact =
            stepInstanceDao.getLatestSubworkflowArtifact(
                workflowSummary.getWorkflowId(),
                workflowSummary.getWorkflowInstanceId(),
                step.getId());
        if (subworkflowArtifact != null) {
          WorkflowInstance instance =
              instanceDao.getWorkflowInstanceRun(
                  subworkflowArtifact.getSubworkflowId(),
                  subworkflowArtifact.getSubworkflowInstanceId(),
                  subworkflowArtifact.getSubworkflowRunId());

          runRequest.updateForDownstreamIfNeeded(step.getId(), instance);
          runResponse = instanceActionHandler.restartDirectly(instance, runRequest);
          LOG.info(
              "In step runtime {}, restarting a subworkflow instance from the parent run {} with response {}",
              runtimeSummary.getIdentity(),
              subworkflowArtifact.getIdentity(),
              runResponse);
        }
      }

      if (runResponse == null) {
        String subworkflowId = runtimeSummary.getParams().get(SUBWORKFLOW_ID_PARAM_NAME).asString();
        String subworkflowVersion =
            runtimeSummary.getParams().get(SUBWORKFLOW_VERSION_PARAM_NAME).asString();

        // always reset runRequest to be START_FRESH_NEW_RUN as this is the first run
        runRequest.clearRestartFor(RunPolicy.START_FRESH_NEW_RUN);
        runResponse = actionHandler.start(subworkflowId, subworkflowVersion, runRequest);
        LOG.info(
            "In step runtime {}, starting a subworkflow instance {}",
            runtimeSummary.getIdentity(),
            runResponse);
      }

      SubworkflowArtifact artifact = new SubworkflowArtifact();
      artifact.setSubworkflowId(runResponse.getWorkflowId());
      artifact.setSubworkflowVersionId(runResponse.getWorkflowVersionId());
      artifact.setSubworkflowInstanceId(runResponse.getWorkflowInstanceId());
      artifact.setSubworkflowRunId(runResponse.getWorkflowRunId());
      artifact.setSubworkflowUuid(runResponse.getWorkflowUuid());
      return new Result(
          State.CONTINUE,
          Collections.singletonMap(artifact.getType().key(), artifact),
          Collections.singletonList(
              TimelineLogEvent.info(
                  "Started a subworkflow with uuid: " + runResponse.getWorkflowUuid())));
    } catch (MaestroRetryableError retryableError) {
      LOG.warn("Failed to start subworkflow with error:", retryableError);
      instanceStepConcurrencyHandler.removeInstance(
          runRequest.getCorrelationId(),
          runRequest.getInitiator().getDepth(),
          runRequest.getRequestId().toString());
      return new Result(
          State.CONTINUE,
          Collections.emptyMap(),
          Collections.singletonList(TimelineDetailsEvent.from(retryableError.getDetails())));
    } catch (Exception e) {
      LOG.warn("Failed to start subworkflow step runtime", e);
      instanceStepConcurrencyHandler.removeInstance(
          runRequest.getCorrelationId(),
          runRequest.getInitiator().getDepth(),
          runRequest.getRequestId().toString());
      boolean retryable = false;
      if (e.getCause() instanceof SQLException) {
        if (RETRYABLE_DB_ERROR_CODE.equals(((SQLException) e.getCause()).getSQLState())
            || RETRYABLE_DB_ERROR_MSG.equals(e.getMessage())) {
          retryable = true;
        }
      }
      return new Result(
          retryable ? State.CONTINUE : State.FATAL_ERROR,
          Collections.emptyMap(),
          Collections.singletonList(
              TimelineDetailsEvent.from(
                  Details.create(
                      e, retryable, "Failed to start subworkflow step runtime with an error"))));
    }
  }