command-runner/pkg/common/executor.go (229 lines of code) (raw):

package common import ( "context" "errors" "fmt" "runtime/debug" "runtime/trace" "time" "github.com/rs/zerolog/log" ) // Warning that implements `error` but safe to ignore. type Warning struct { Message string } // Error the contract for error func (w Warning) Error() string { return w.Message } // ErrDefer that implements `error` but safe to ignore. var ErrDefer = errors.New("deferred") // Executor define contract for the steps of a workflow type Executor func(ctx context.Context) error // Conditional define contract for the conditional predicate type Conditional func(ctx context.Context) bool // NewInfoExecutor is an executor that logs messages func NewInfoExecutor(format string, args ...interface{}) Executor { return func(ctx context.Context) error { log.Ctx(ctx).Info().Msgf(format, args...) return nil } } // NewWarning returns a warning func NewWarning(format string, args ...interface{}) error { return Warning{ Message: fmt.Sprintf(format, args...), } } // NewWarningExecutor is an executor that returns a warning func NewWarningExecutor(format string, args ...interface{}) Executor { return func(ctx context.Context) error { return NewWarning(format, args...) } } // NewDebugExecutor is an executor that logs messages func NewDebugExecutor(format string, args ...interface{}) Executor { return func(ctx context.Context) error { log.Ctx(ctx).Debug().Msgf(format, args...) return nil } } // NewPipelineExecutor creates a new executor from a series of other executors func NewPipelineExecutor(executors ...Executor) Executor { if len(executors) == 0 { return func(ctx context.Context) error { return nil } } var rtn Executor for _, executor := range executors { if rtn == nil { rtn = executor } else { rtn = rtn.Then(executor) } } return rtn } // NewConditionalExecutor creates a new executor based on conditions func NewConditionalExecutor(conditional Conditional, trueExecutor Executor, falseExecutor Executor) Executor { return func(ctx context.Context) error { if conditional(ctx) { if trueExecutor != nil { return trueExecutor(ctx) } } else { if falseExecutor != nil { return falseExecutor(ctx) } } return nil } } // NewErrorExecutor creates a new executor that always errors out func NewErrorExecutor(err error) Executor { return func(ctx context.Context) error { return err } } // NewParallelExecutor creates a new executor from a parallel of other executors func NewParallelExecutor(parallel int, executors ...Executor) Executor { return func(ctx context.Context) error { work := make(chan Executor, len(executors)) errs := make(chan error, len(executors)) for i := 0; i < parallel; i++ { go func(lwork <-chan Executor, errs chan<- error) { for executor := range lwork { if err := executor(ctx); errors.Is(err, ErrDefer) { thisExecutor := executor go func() { time.Sleep(1 * time.Second) work <- thisExecutor }() } else { errs <- err } } }(work, errs) } for i := 0; i < len(executors); i++ { work <- executors[i] } defer close(work) // Executor waits all executors to cleanup these resources. var rtnError error for i := 0; i < len(executors); i++ { select { case err := <-errs: switch err.(type) { case Warning: log.Ctx(ctx).Debug().Err(err).Msg("Got warning") default: rtnError = errors.Join(rtnError, err) } case <-ctx.Done(): return ctx.Err() } } log.Ctx(ctx).Debug().Err(rtnError).Msg("Parallel executor finished") return rtnError } } // Then runs another executor if this executor succeeds func (e Executor) Then(then Executor) Executor { return func(ctx context.Context) error { err := e(ctx) if err != nil { switch err.(type) { case Warning: log.Ctx(ctx).Warn().Err(err) default: return err } } if ctx.Err() != nil { return ctx.Err() } return then(ctx) } } // If only runs this executor if conditional is true func (e Executor) If(conditional Conditional) Executor { return func(ctx context.Context) error { if conditional(ctx) { return e(ctx) } return nil } } // IfNot only runs this executor if conditional is true func (e Executor) IfNot(conditional Conditional) Executor { return func(ctx context.Context) error { if !conditional(ctx) { return e(ctx) } return nil } } // IfBool only runs this executor if conditional is true func (e Executor) IfBool(conditional bool) Executor { return e.If(func(ctx context.Context) bool { return conditional }) } // Finally adds an executor to run after other executor func (e Executor) Finally(finally Executor) Executor { return func(ctx context.Context) error { err := e(ctx) err2 := finally(ctx) if err2 != nil { return fmt.Errorf("Error occurred running finally: %v (original error: %v)", err2, err) } return err } } // Not return an inverted conditional func (c Conditional) Not() Conditional { return func(ctx context.Context) bool { return !c(ctx) } } // Wrapper is a type that performs activities before and/or after an executor runs type Wrapper func(ctx context.Context, e Executor) error // Wrap returns a new [Executor] that applies this [Wrapper] to the provided [Executor] func (w Wrapper) Wrap(e Executor) Executor { return func(ctx context.Context) error { return w(ctx, e) } } // WrapWith returns a new [Executor] that applies the provided [Wrapper]s to this [Executor] func (e Executor) WrapWith(wrappers ...Wrapper) Executor { rtn := e for _, w := range wrappers { rtn = w.Wrap(rtn) } return rtn } // CatchPanic wraps the executor with panic handler func CatchPanic(ctx context.Context, e Executor) error { err := make(chan error, 1) go func() { defer func() { if r := recover(); r != nil { err <- fmt.Errorf("💀 panic: %v\n%s", r, debug.Stack()) } close(err) }() err <- e(ctx) }() select { case rtn := <-err: return rtn case <-ctx.Done(): return ctx.Err() } } // CatchPanic wraps the executor with panic handler func (e Executor) CatchPanic() Executor { return func(ctx context.Context) error { return CatchPanic(ctx, e) } } // TraceRegion wraps the executor with a trace func (e Executor) TraceRegion(regionType string) Executor { return func(ctx context.Context) error { var err error trace.WithRegion(ctx, regionType, func() { err = e(ctx) }) return err } } // ReadyFunc determines when an executor is ready to run type ReadyFunc func() (bool, error) // DeferUntil a ReadyFunc is ready func (e Executor) DeferUntil(ready ReadyFunc) Executor { return func(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } if ok, err := ready(); ok { return e(ctx) } else if err != nil { return err } return ErrDefer } }