func AddStep[JT, ST any]()

in step_builder.go [13:58]


func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
	if err := addStepPreCheck(j, stepName); err != nil {
		return nil, err
	}

	stepD := newStepDefinition[ST](stepName, stepTypeTask, optionDecorators...)
	precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn())
	if err != nil {
		return nil, err
	}

	// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
	if len(precedingDefSteps) == 0 {
		precedingDefSteps = append(precedingDefSteps, j.getRootStep())
		stepD.executionOptions.DependOn = append(stepD.executionOptions.DependOn, j.getRootStep().GetName())
	}

	stepD.instanceCreator = func(ctx context.Context, ji JobInstanceMeta) StepInstanceMeta {
		// TODO: error is ignored here
		precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)

		jiStrongTyped := ji.(*JobInstance[JT])
		stepFunc := stepFuncCreator(jiStrongTyped.input)
		stepFuncWithPanicHandling := func(ctx context.Context) (result ST, err error) {
			// handle panic from user code
			defer func() {
				if r := recover(); r != nil {
					err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
				}
			}()

			result, err = stepFunc(ctx)
			return result, err
		}

		stepInstance := newStepInstance(stepD, ji)
		stepInstance.task = asynctask.Start(ctx, instrumentedAddStep(stepInstance, precedingTasks, stepFuncWithPanicHandling))
		ji.addStepInstance(stepInstance, precedingInstances...)
		return stepInstance
	}

	if err := j.addStep(stepD, precedingDefSteps...); err != nil {
		return nil, err
	}
	return stepD, nil
}