in command-runner/pkg/common/executor.go [105:150]
func NewParallelExecutor(parallel int, executors ...Executor) Executor {
return func(ctx context.Context) error {
work := make(chan Executor, len(executors))
errs := make(chan error, len(executors))
for i := 0; i < parallel; i++ {
go func(lwork <-chan Executor, errs chan<- error) {
for executor := range lwork {
if err := executor(ctx); errors.Is(err, ErrDefer) {
thisExecutor := executor
go func() {
time.Sleep(1 * time.Second)
work <- thisExecutor
}()
} else {
errs <- err
}
}
}(work, errs)
}
for i := 0; i < len(executors); i++ {
work <- executors[i]
}
defer close(work)
// Executor waits all executors to cleanup these resources.
var rtnError error
for i := 0; i < len(executors); i++ {
select {
case err := <-errs:
switch err.(type) {
case Warning:
log.Ctx(ctx).Debug().Err(err).Msg("Got warning")
default:
rtnError = errors.Join(rtnError, err)
}
case <-ctx.Done():
return ctx.Err()
}
}
log.Ctx(ctx).Debug().Err(rtnError).Msg("Parallel executor finished")
return rtnError
}
}