in maestro-engine/src/main/java/com/netflix/maestro/engine/steps/SubworkflowStepRuntime.java [88:190]
private Result runSubworkflowInstance(
WorkflowSummary workflowSummary, Step step, StepRuntimeSummary runtimeSummary) {
RunRequest runRequest =
StepHelper.createInternalWorkflowRunRequest(
workflowSummary,
runtimeSummary,
Collections.singletonList(Tag.create(SUBWORKFLOW_TAG_NAME)),
createSubworkflowRunParam(workflowSummary, step, runtimeSummary),
workflowSummary.getIdentity() + runtimeSummary.getIdentity());
if (!instanceStepConcurrencyHandler.addInstance(runRequest)) {
return new Result(
State.CONTINUE,
Collections.emptyMap(),
Collections.singletonList(
TimelineLogEvent.info(
"Unavailable due to InstanceStepConcurrency and will retry later to launch the subworkflow")));
}
try {
RunResponse runResponse = null;
// restart from workflow or restart the step
if (!workflowSummary.isFreshRun() || !runtimeSummary.getStepRetry().isRetryable()) {
SubworkflowArtifact subworkflowArtifact =
stepInstanceDao.getLatestSubworkflowArtifact(
workflowSummary.getWorkflowId(),
workflowSummary.getWorkflowInstanceId(),
step.getId());
if (subworkflowArtifact != null) {
WorkflowInstance instance =
instanceDao.getWorkflowInstanceRun(
subworkflowArtifact.getSubworkflowId(),
subworkflowArtifact.getSubworkflowInstanceId(),
subworkflowArtifact.getSubworkflowRunId());
runRequest.updateForDownstreamIfNeeded(step.getId(), instance);
runResponse = instanceActionHandler.restartDirectly(instance, runRequest);
LOG.info(
"In step runtime {}, restarting a subworkflow instance from the parent run {} with response {}",
runtimeSummary.getIdentity(),
subworkflowArtifact.getIdentity(),
runResponse);
}
}
if (runResponse == null) {
String subworkflowId = runtimeSummary.getParams().get(SUBWORKFLOW_ID_PARAM_NAME).asString();
String subworkflowVersion =
runtimeSummary.getParams().get(SUBWORKFLOW_VERSION_PARAM_NAME).asString();
// always reset runRequest to be START_FRESH_NEW_RUN as this is the first run
runRequest.clearRestartFor(RunPolicy.START_FRESH_NEW_RUN);
runResponse = actionHandler.start(subworkflowId, subworkflowVersion, runRequest);
LOG.info(
"In step runtime {}, starting a subworkflow instance {}",
runtimeSummary.getIdentity(),
runResponse);
}
SubworkflowArtifact artifact = new SubworkflowArtifact();
artifact.setSubworkflowId(runResponse.getWorkflowId());
artifact.setSubworkflowVersionId(runResponse.getWorkflowVersionId());
artifact.setSubworkflowInstanceId(runResponse.getWorkflowInstanceId());
artifact.setSubworkflowRunId(runResponse.getWorkflowRunId());
artifact.setSubworkflowUuid(runResponse.getWorkflowUuid());
return new Result(
State.CONTINUE,
Collections.singletonMap(artifact.getType().key(), artifact),
Collections.singletonList(
TimelineLogEvent.info(
"Started a subworkflow with uuid: " + runResponse.getWorkflowUuid())));
} catch (MaestroRetryableError retryableError) {
LOG.warn("Failed to start subworkflow with error:", retryableError);
instanceStepConcurrencyHandler.removeInstance(
runRequest.getCorrelationId(),
runRequest.getInitiator().getDepth(),
runRequest.getRequestId().toString());
return new Result(
State.CONTINUE,
Collections.emptyMap(),
Collections.singletonList(TimelineDetailsEvent.from(retryableError.getDetails())));
} catch (Exception e) {
LOG.warn("Failed to start subworkflow step runtime", e);
instanceStepConcurrencyHandler.removeInstance(
runRequest.getCorrelationId(),
runRequest.getInitiator().getDepth(),
runRequest.getRequestId().toString());
boolean retryable = false;
if (e.getCause() instanceof SQLException) {
if (RETRYABLE_DB_ERROR_CODE.equals(((SQLException) e.getCause()).getSQLState())
|| RETRYABLE_DB_ERROR_MSG.equals(e.getMessage())) {
retryable = true;
}
}
return new Result(
retryable ? State.CONTINUE : State.FATAL_ERROR,
Collections.emptyMap(),
Collections.singletonList(
TimelineDetailsEvent.from(
Details.create(
e, retryable, "Failed to start subworkflow step runtime with an error"))));
}
}