in step_builder.go [13:58]
func AddStep[JT, ST any](j *JobDefinition[JT], stepName string, stepFuncCreator func(input JT) asynctask.AsyncFunc[ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
if err := addStepPreCheck(j, stepName); err != nil {
return nil, err
}
stepD := newStepDefinition[ST](stepName, stepTypeTask, optionDecorators...)
precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn())
if err != nil {
return nil, err
}
// if a step have no preceding tasks, link it to our rootJob as preceding task, so it won't start yet.
if len(precedingDefSteps) == 0 {
precedingDefSteps = append(precedingDefSteps, j.getRootStep())
stepD.executionOptions.DependOn = append(stepD.executionOptions.DependOn, j.getRootStep().GetName())
}
stepD.instanceCreator = func(ctx context.Context, ji JobInstanceMeta) StepInstanceMeta {
// TODO: error is ignored here
precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)
jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context) (result ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()
result, err = stepFunc(ctx)
return result, err
}
stepInstance := newStepInstance(stepD, ji)
stepInstance.task = asynctask.Start(ctx, instrumentedAddStep(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
}
if err := j.addStep(stepD, precedingDefSteps...); err != nil {
return nil, err
}
return stepD, nil
}