in maestro-engine/src/main/java/com/netflix/maestro/engine/tasks/MaestroTask.java [193:276]
  private StepRuntimeSummary createStepRuntimeSummary(
      Task task,
      Step stepDefinition,
      WorkflowSummary workflowSummary,
      SignalDependencies dependencies) {
    StepInstance.StepRetry stepRetry;
    long stepInstanceId;
    SignalDependencies dependenciesToUse;
    if (task.getRetryCount() == 0) { // this is a new start
      stepRetry = initializeStepRetry(stepDefinition, workflowSummary);
      stepInstanceId = task.getSeq(); // may have a gap but increasing monotonically
      dependenciesToUse = dependencies;
      // handle sequential restart cases with dummy root nodes (NOT_CREATED)
      if (!workflowSummary.isFreshRun()
          && workflowSummary.getRestartConfig() != null
          && workflowSummary.getRunPolicy() == RunPolicy.RESTART_FROM_SPECIFIC) {
        Set<String> dummyRootStepIds =
            DagHelper.getNotCreatedRootNodesInRestartRuntimeDag(
                workflowSummary.getRuntimeDag(), workflowSummary.getRestartConfig());
        if (dummyRootStepIds.contains(stepDefinition.getId())) {
          stepRetry.setRetryable(false);
        }
      }
    } else { // this is a retry
      StepRuntimeSummary prev =
          StepHelper.retrieveRuntimeSummary(objectMapper, task.getOutputData());
      stepRetry = prev.getStepRetry();
      stepRetry.incrementByStatus(prev.getRuntimeState().getStatus());
      stepInstanceId = prev.getStepInstanceId();
      dependenciesToUse = prev.getSignalDependencies();
    }
    long stepAttemptId = task.getRetryCount() + 1L;
    MaestroTracingContext tracingContext = null;
    if (tracingManager != null) {
      tracingContext =
          tracingManager.initTracingContext(
              workflowSummary,
              StepRuntimeSummary.builder()
                  .stepId(stepDefinition.getId())
                  .stepInstanceId(stepInstanceId)
                  .stepAttemptId(stepAttemptId)
                  .stepInstanceUuid(task.getTaskId())
                  .build());
    }
    StepRuntimeSummary runtimeSummary =
        StepRuntimeSummary.builder()
            .stepId(stepDefinition.getId())
            .stepAttemptId(stepAttemptId)
            .stepInstanceUuid(task.getTaskId())
            .stepName(StepHelper.getStepNameOrDefault(stepDefinition))
            .stepInstanceId(stepInstanceId)
            .tags(stepDefinition.getTags())
            .type(stepDefinition.getType())
            .subType(stepDefinition.getSubType())
            .params(new LinkedHashMap<>()) // empty placeholder
            .transition(StepInstanceTransition.from(stepDefinition))
            .stepRetry(stepRetry)
            .timeoutInMillis(null) // mean to use system default timeout initially
            .synced(true)
            .signalDependencies(dependenciesToUse)
            .dbOperation(DbOperation.INSERT)
            .tracingContext(tracingContext)
            .artifacts(
                Optional.ofNullable(tracingContext)
                    .map(MaestroTracingContext::toTracingArtifacts)
                    .orElse(null))
            .build();
    if (!stepRetry.isRetryable()) {
      LOG.debug(
          "Create a placeholder task for workflow {}{} with failure model [{}]",
          workflowSummary.getIdentity(),
          runtimeSummary.getIdentity(),
          stepDefinition.getFailureMode());
      runtimeSummary.getRuntimeState().setStatus(StepInstance.Status.NOT_CREATED);
    } else {
      runtimeSummary.markCreated(tracingManager);
    }
    return runtimeSummary;
  }