step.go (237 lines of code) (raw):

package flow import ( "context" "time" ) // Steper describes the requirement for a Step, which is basic unit of a Workflow. // // Implement this interface to allow Workflow orchestrating your Steps. // // Notice Steper will be saved in Workflow as map key, so it's supposed to be 'comparable' type like pointer. type Steper interface { Do(context.Context) error } // Builder builds a Workflow by adding Steps. type Builder interface { AddToWorkflow() map[Steper]*StepConfig } // BeforeStep defines callback being called BEFORE step being executed. type BeforeStep func(context.Context, Steper) (context.Context, error) // AfterStep defines callback being called AFTER step being executed. type AfterStep func(context.Context, Steper, error) error type StepConfig struct { Upstreams Set[Steper] // Upstreams of the Step, means these Steps should happen-before this Step Before []BeforeStep // Before callbacks of the Step, will be called before Do After []AfterStep // After callbacks of the Step, will be called before Do Option []func(*StepOption) // Option customize the Step settings } type StepOption struct { RetryOption *RetryOption // RetryOption customize how the Step should be retried, default (nil) means no retry. Condition Condition // Condition decides whether Workflow should execute the Step, default to DefaultCondition. Timeout *time.Duration // Timeout sets the Step level timeout, default (nil) means no timeout. } // Steps declares a series of Steps ready to be added into Workflow. // // The Steps declared are mutually independent. // // workflow.Add( // Steps(a, b, c), // a, b, c will be executed in parallel // Steps(a, b, c).DependsOn(d, e), // d, e will be executed in parallel, then a, b, c in parallel // ) func Steps(steps ...Steper) AddSteps { rv := make(AddSteps) for _, step := range steps { rv[step] = &StepConfig{Upstreams: make(Set[Steper])} } return rv } // Step declares Step ready to be added into Workflow. // // The main difference between Step() and Steps() is that, // Step() allows to add Input for the Step. // // Step(a).Input(func(ctx context.Context, a *A) error { // // fill a // })) func Step[S Steper](steps ...S) AddStep[S] { return AddStep[S]{ AddSteps: Steps(ToSteps(steps)...), Steps: steps, } } // Pipe creates a pipeline in Workflow. // // workflow.Add( // Pipe(a, b, c), // a -> b -> c // ) // // The above code is equivalent to: // // workflow.Add( // Step(b).DependsOn(a), // Step(c).DependsOn(b), // ) func Pipe(steps ...Steper) AddSteps { as := Steps(steps...) for i := 0; i < len(steps)-1; i++ { as.Merge(Steps(steps[i+1]).DependsOn(steps[i])) } return as } // BatchPipe creates a batched pipeline in Workflow. // // workflow.Add( // BatchPipe( // Steps(a, b), // Steps(c, d, e), // Steps(f), // ), // ) // // The above code is equivalent to: // // workflow.Add( // Steps(c, d, e).DependsOn(a, b), // Steps(f).DependsOn(c, d, e), // ) func BatchPipe(batch ...AddSteps) AddSteps { as := Steps() for _, other := range batch { as.Merge(other) } for i := 0; i < len(batch)-1; i++ { as.Merge(Steps(Keys(batch[i+1])...).DependsOn(Keys(batch[i])...)) } return as } // DependsOn declares dependency on the given Steps. // // Step(a).DependsOn(b, c) // // Then b, c should happen-before a. func (as AddSteps) DependsOn(ups ...Steper) AddSteps { for down := range as { as[down].Upstreams.Add(ups...) } return as } // Input adds BeforeStep callback for the Step(s). // // Input callbacks will be called before Do, // and the order will respect the order of declarations. // // Step(a). // Input(/* 1. this Input will be called first */). // Input(/* 2. this Input will be called after 1. */) // Step(a).Input(/* 3. this Input is after all */) // // The Input callbacks are executed at runtime and per-try. func (as AddStep[S]) Input(fns ...func(context.Context, S) error) AddStep[S] { for _, step := range as.Steps { step := step // capture range variable for _, fn := range fns { if fn != nil { fn := fn // capture range variable as.AddSteps[step].Before = append(as.AddSteps[step].Before, func(ctx context.Context, _ Steper) (context.Context, error) { return ctx, fn(ctx, step) }) } } } return as } // Output can pass the results of the Step to outer scope. // Output is only triggered when the Step is successful (returns nil error). // // Output actually adds AfterStep callback for the Step(s). // // The Output callbacks are executed at runtime and per-try. func (as AddStep[S]) Output(fns ...func(context.Context, S) error) AddStep[S] { for _, step := range as.Steps { step := step // capture range variable for _, fn := range fns { if fn != nil { fn := fn // capture range variable as.AddSteps[step].After = append(as.AddSteps[step].After, func(ctx context.Context, _ Steper, err error) error { if err == nil { return fn(ctx, step) } return err }) } } } return as } // BeforeStep adds BeforeStep callback for the Step(s). // // The BeforeStep callback will be called before Do, and return when first error occurs. // The order of execution will respect the order of declarations. // The BeforeStep callbacks are able to change the context.Context feed into Do. // The BeforeStep callbacks are executed at runtime and per-try. func (as AddSteps) BeforeStep(befores ...BeforeStep) AddSteps { for step := range as { as[step].Before = append(as[step].Before, befores...) } return as } // AfterStep adds AfterStep callback for the Step(s). // // The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. // The order of execution will respect the order of declarations. // The AfterStep callbacks are able to change the error returned by Do. // The AfterStep callbacks are executed at runtime and per-try. func (as AddSteps) AfterStep(afters ...AfterStep) AddSteps { for step := range as { as[step].After = append(as[step].After, afters...) } return as } // Timeout sets the Step level timeout. func (as AddSteps) Timeout(timeout time.Duration) AddSteps { for step := range as { as[step].Option = append(as[step].Option, func(so *StepOption) { so.Timeout = &timeout }) } return as } // When set the Condition for the Step. func (as AddSteps) When(cond Condition) AddSteps { for step := range as { as[step].Option = append(as[step].Option, func(so *StepOption) { so.Condition = cond }) } return as } // Retry customize how the Step should be retried. // // Step will be retried as long as this option is configured. // // w.Add( // Step(a), // not retry // Step(b).Retry(func(opt *RetryOption) { // will retry 3 times // opt.MaxAttempts = 3 // }), // Step(c).Retry(nil), // will use DefaultRetryOption! // ) func (as AddSteps) Retry(opts ...func(*RetryOption)) AddSteps { for step := range as { as[step].Option = append(as[step].Option, func(so *StepOption) { if so.RetryOption == nil { so.RetryOption = new(RetryOption) *so.RetryOption = DefaultRetryOption } for _, opt := range opts { if opt != nil { opt(so.RetryOption) } } }) } return as } // AddToWorkflow implements Builder func (as AddSteps) AddToWorkflow() map[Steper]*StepConfig { return as } // Merge another AddSteps into one. func (as AddSteps) Merge(others ...AddSteps) AddSteps { for _, other := range others { for k, v := range other { if as[k] == nil { as[k] = new(StepConfig) } as[k].Merge(v) } } return as } // DependsOn declares dependency on the given Steps. // // Step(a).DependsOn(b, c) // // Then b, c should happen-before a. func (as AddStep[S]) DependsOn(ups ...Steper) AddStep[S] { as.AddSteps = as.AddSteps.DependsOn(ups...) return as } // BeforeStep adds BeforeStep callback for the Step(s). // // The BeforeStep callback will be called before Do, and return when first error occurs. // The order of execution will respect the order of declarations. // The BeforeStep callbacks are able to change the context.Context feed into Do. // The BeforeStep callbacks are executed at runtime and per-try. func (as AddStep[S]) BeforeStep(befores ...BeforeStep) AddStep[S] { as.AddSteps = as.AddSteps.BeforeStep(befores...) return as } // AfterStep adds AfterStep callback for the Step(s). // // The AfterStep callback will be called after Do, and pass the error to next AfterStep callback. // The order of execution will respect the order of declarations. // The AfterStep callbacks are able to change the error returned by Do. // The AfterStep callbacks are executed at runtime and per-try. func (as AddStep[S]) AfterStep(afters ...AfterStep) AddStep[S] { as.AddSteps = as.AddSteps.AfterStep(afters...) return as } // Timeout sets the Step level timeout. func (as AddStep[S]) Timeout(timeout time.Duration) AddStep[S] { as.AddSteps = as.AddSteps.Timeout(timeout) return as } // When set the Condition for the Step. func (as AddStep[S]) When(when Condition) AddStep[S] { as.AddSteps = as.AddSteps.When(when) return as } // Retry customize how the Step should be retried. // // Step will be retried as long as this option is configured. // // w.Add( // Step(a), // not retry // Step(b).Retry(func(opt *RetryOption) { // will retry 3 times // opt.MaxAttempts = 3 // }), // Step(c).Retry(nil), // will use DefaultRetryOption! // ) func (as AddStep[S]) Retry(fns ...func(*RetryOption)) AddStep[S] { as.AddSteps = as.AddSteps.Retry(fns...) return as } type AddStep[S Steper] struct { AddSteps Steps []S } type AddSteps map[Steper]*StepConfig // ToSteps converts []<StepDoer implemention> to []StepDoer. // // steps := []someStepImpl{ ... } // flow.Add( // Steps(ToSteps(steps)...), // ) func ToSteps[S Steper](steps []S) []Steper { rv := []Steper{} for _, s := range steps { rv = append(rv, s) } return rv } func (sc *StepConfig) Merge(other *StepConfig) { if other == nil { return } if sc.Upstreams == nil { sc.Upstreams = make(Set[Steper]) } sc.Upstreams.Union(other.Upstreams) sc.Before = append(sc.Before, other.Before...) sc.After = append(sc.After, other.After...) sc.Option = append(sc.Option, other.Option...) } type Set[T comparable] map[T]struct{} func (s Set[T]) Has(v T) bool { if s == nil { return false } _, ok := s[v] return ok } func (s *Set[T]) Add(vs ...T) { if *s == nil { *s = make(Set[T]) } for _, v := range vs { (*s)[v] = struct{}{} } } func (s *Set[T]) Union(sets ...Set[T]) { for _, set := range sets { s.Add(set.Flatten()...) } } func (s Set[T]) Flatten() []T { r := make([]T, 0, len(s)) for v := range s { r = append(r, v) } return r } // Keys returns the keys of the map m. // The keys will be in an indeterminate order. func Keys[M ~map[K]V, K comparable, V any](m M) []K { r := make([]K, 0, len(m)) for k := range m { r = append(r, k) } return r } // Values returns the values of the map m. // The values will be in an indeterminate order. func Values[M ~map[K]V, K comparable, V any](m M) []V { r := make([]V, 0, len(m)) for _, v := range m { r = append(r, v) } return r }