in step_builder.go [61:102]
func StepAfter[JT, PT, ST any](j *JobDefinition[JT], stepName string, parentStep *StepDefinition[PT], stepAfterFuncCreator func(input JT) asynctask.ContinueFunc[PT, ST], optionDecorators ...ExecutionOptionPreparer) (*StepDefinition[ST], error) {
if err := addStepPreCheck(j, stepName); err != nil {
return nil, err
}
stepD := newStepDefinition[ST](stepName, stepTypeTask, append(optionDecorators, ExecuteAfter(parentStep))...)
precedingDefSteps, err := getDependsOnSteps(j, stepD.DependsOn())
if err != nil {
return nil, err
}
stepD.instanceCreator = func(ctx context.Context, ji JobInstanceMeta) StepInstanceMeta {
// TODO: error is ignored here
precedingInstances, precedingTasks, _ := getDependsOnStepInstances(stepD, ji)
jiStrongTyped := ji.(*JobInstance[JT])
stepFunc := stepAfterFuncCreator(jiStrongTyped.input)
stepFuncWithPanicHandling := func(ctx context.Context, pt PT) (result ST, err error) {
// handle panic from user code
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic cought: %v, StackTrace: %s", r, debug.Stack())
}
}()
result, err = stepFunc(ctx, pt)
return result, err
}
parentStepInstance := getStrongTypedStepInstance(parentStep, ji)
stepInstance := newStepInstance(stepD, ji)
// here ContinueWith may not invoke instrumentedStepAfterBoth at all, if parentStep1 or parentStep2 returns error.
stepInstance.task = asynctask.ContinueWith(ctx, parentStepInstance.task, instrumentedStepAfter(stepInstance, precedingTasks, stepFuncWithPanicHandling))
ji.addStepInstance(stepInstance, precedingInstances...)
return stepInstance
}
if err := j.addStep(stepD, precedingDefSteps...); err != nil {
return nil, err
}
return stepD, nil
}