in step_builder.go [202:234]
func instrumentedStepAfter[T, S any](stepInstance *StepInstance[S], precedingTasks []asynctask.Waitable, stepFunc func(ctx context.Context, t T) (S, error)) func(ctx context.Context, t T) (S, error) {
return func(ctx context.Context, t T) (S, error) {
if err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, precedingTasks...); err != nil {
/* this only work on ExecuteAfter (have precedent step, but not taking input from it)
asynctask.ContinueWith and asynctask.AfterBoth won't invoke instrumentedFunc if any of the preceding task failed.
we need to be consistent on before we do any state change or error handling. */
return *new(S), err
}
stepInstance.executionData.StartTime = time.Now()
stepInstance.state = StepStateRunning
ctx = stepInstance.EnrichContext(ctx)
var result S
var err error
if stepInstance.Definition.executionOptions.RetryPolicy != nil {
stepInstance.executionData.Retried = &RetryReport{}
result, err = newRetryer(stepInstance.Definition.executionOptions.RetryPolicy, stepInstance.executionData.Retried, func() (S, error) { return stepFunc(ctx, t) }).Run()
} else {
result, err = stepFunc(ctx, t)
}
stepInstance.executionData.Duration = time.Since(stepInstance.executionData.StartTime)
if err != nil {
stepInstance.state = StepStateFailed
return *new(S), newStepError(ErrStepFailed, stepInstance, err)
} else {
stepInstance.state = StepStateCompleted
return result, nil
}
}
}