in maestro-engine/src/main/java/com/netflix/maestro/engine/tasks/MaestroTask.java [193:276]
private StepRuntimeSummary createStepRuntimeSummary(
Task task,
Step stepDefinition,
WorkflowSummary workflowSummary,
SignalDependencies dependencies) {
StepInstance.StepRetry stepRetry;
long stepInstanceId;
SignalDependencies dependenciesToUse;
if (task.getRetryCount() == 0) { // this is a new start
stepRetry = initializeStepRetry(stepDefinition, workflowSummary);
stepInstanceId = task.getSeq(); // may have a gap but increasing monotonically
dependenciesToUse = dependencies;
// handle sequential restart cases with dummy root nodes (NOT_CREATED)
if (!workflowSummary.isFreshRun()
&& workflowSummary.getRestartConfig() != null
&& workflowSummary.getRunPolicy() == RunPolicy.RESTART_FROM_SPECIFIC) {
Set<String> dummyRootStepIds =
DagHelper.getNotCreatedRootNodesInRestartRuntimeDag(
workflowSummary.getRuntimeDag(), workflowSummary.getRestartConfig());
if (dummyRootStepIds.contains(stepDefinition.getId())) {
stepRetry.setRetryable(false);
}
}
} else { // this is a retry
StepRuntimeSummary prev =
StepHelper.retrieveRuntimeSummary(objectMapper, task.getOutputData());
stepRetry = prev.getStepRetry();
stepRetry.incrementByStatus(prev.getRuntimeState().getStatus());
stepInstanceId = prev.getStepInstanceId();
dependenciesToUse = prev.getSignalDependencies();
}
long stepAttemptId = task.getRetryCount() + 1L;
MaestroTracingContext tracingContext = null;
if (tracingManager != null) {
tracingContext =
tracingManager.initTracingContext(
workflowSummary,
StepRuntimeSummary.builder()
.stepId(stepDefinition.getId())
.stepInstanceId(stepInstanceId)
.stepAttemptId(stepAttemptId)
.stepInstanceUuid(task.getTaskId())
.build());
}
StepRuntimeSummary runtimeSummary =
StepRuntimeSummary.builder()
.stepId(stepDefinition.getId())
.stepAttemptId(stepAttemptId)
.stepInstanceUuid(task.getTaskId())
.stepName(StepHelper.getStepNameOrDefault(stepDefinition))
.stepInstanceId(stepInstanceId)
.tags(stepDefinition.getTags())
.type(stepDefinition.getType())
.subType(stepDefinition.getSubType())
.params(new LinkedHashMap<>()) // empty placeholder
.transition(StepInstanceTransition.from(stepDefinition))
.stepRetry(stepRetry)
.timeoutInMillis(null) // mean to use system default timeout initially
.synced(true)
.signalDependencies(dependenciesToUse)
.dbOperation(DbOperation.INSERT)
.tracingContext(tracingContext)
.artifacts(
Optional.ofNullable(tracingContext)
.map(MaestroTracingContext::toTracingArtifacts)
.orElse(null))
.build();
if (!stepRetry.isRetryable()) {
LOG.debug(
"Create a placeholder task for workflow {}{} with failure model [{}]",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity(),
stepDefinition.getFailureMode());
runtimeSummary.getRuntimeState().setStatus(StepInstance.Status.NOT_CREATED);
} else {
runtimeSummary.markCreated(tracingManager);
}
return runtimeSummary;
}