func NewParallelExecutor()

in command-runner/pkg/common/executor.go [105:150]


func NewParallelExecutor(parallel int, executors ...Executor) Executor {
	return func(ctx context.Context) error {
		work := make(chan Executor, len(executors))
		errs := make(chan error, len(executors))

		for i := 0; i < parallel; i++ {
			go func(lwork <-chan Executor, errs chan<- error) {
				for executor := range lwork {
					if err := executor(ctx); errors.Is(err, ErrDefer) {
						thisExecutor := executor
						go func() {
							time.Sleep(1 * time.Second)
							work <- thisExecutor
						}()
					} else {
						errs <- err
					}
				}
			}(work, errs)
		}

		for i := 0; i < len(executors); i++ {
			work <- executors[i]
		}
		defer close(work)

		// Executor waits all executors to cleanup these resources.
		var rtnError error
		for i := 0; i < len(executors); i++ {
			select {
			case err := <-errs:
				switch err.(type) {
				case Warning:
					log.Ctx(ctx).Debug().Err(err).Msg("Got warning")
				default:
					rtnError = errors.Join(rtnError, err)
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		log.Ctx(ctx).Debug().Err(rtnError).Msg("Parallel executor finished")

		return rtnError
	}
}