command-runner/pkg/runner/runner.go (112 lines of code) (raw):

package runner import ( "context" "encoding/json" "errors" "fmt" "math" "os" "strings" "github.com/aws/codecatalyst-runner-cli/command-runner/internal/containers" "github.com/aws/codecatalyst-runner-cli/command-runner/pkg/common" "github.com/rs/zerolog/log" ) func newRunner(namespace string, executionType ExecutionType, plan Plan, features ...Feature) common.Executor { var executor common.Executor executor = func(ctx context.Context) error { logPlan("About to execute plan\n", plan) for _, commandGroup := range plan.CommandGroups() { id := fmt.Sprintf("%s-%s", namespace, plan.ID()) executor, err := newCommandExecutor(ctx, id, executionType, commandGroup, plan.EnvironmentConfiguration()) if err != nil { return err } for _, command := range commandGroup.Commands { log.Ctx(ctx).Info().Msgf("⚡️ %s", strings.Join(command, " ")) err := executor.ExecuteCommand(ctx, command) if err != nil { if closeErr := executor.Close(true); closeErr != nil { return errors.Join(err, closeErr) } return err } } if err := executor.Close(false); err != nil { return err } } return nil } executor = executor.CatchPanic() for _, feature := range features { executor = newFeatureWrapper(feature, plan).Wrap(executor).CatchPanic() } return executor } func newFeatureWrapper(feature Feature, plan Plan) common.Wrapper { return func(ctx context.Context, e common.Executor) error { return feature(ctx, plan, PlanExecutor(e)) } } func logPlan(message string, plan Plan) { if log.Debug().Enabled() { planJSON, _ := json.Marshal(plan) log.Debug().Msgf("%s%s", message, planJSON) } } // RunAllParams contains the input parameters for the RunAll function type RunAllParams struct { Namespace string // namespace for this execution Plans PlansProvider // provider for the list of plans to run Features FeaturesProvider // provider for the features to apply to each plan Concurrency int // number of plans to run concurrently ExecutionType ExecutionType // executor to use for running commands } // PlansProvider returns a list of [Plan]s type PlansProvider interface { Plans(ctx context.Context) ([]Plan, error) } // FeaturesProvider returns a list of [Feature]s for a given [Plan] type FeaturesProvider interface { Features(Plan) ([]Feature, error) } // RunAll executes all plans and features in parallel func RunAll(ctx context.Context, params *RunAllParams) error { if params.Plans == nil { return fmt.Errorf("plannables provider cannot be nil") } plans, err := params.Plans.Plans(ctx) if err != nil { return fmt.Errorf("unable to get plans from provider: %w", err) } executors := make([]common.Executor, 0) for _, plan := range plans { var features []Feature if params.Features != nil { features, err = params.Features.Features(plan) if err != nil { return fmt.Errorf("unable to get features: %w", err) } } executor := newRunner(params.Namespace, params.ExecutionType, plan, features...) executors = append(executors, executor) } concurrency := int(math.Max(1, float64(params.Concurrency))) return common.NewParallelExecutor(concurrency, executors...).TraceRegion("actions-runall")(ctx) } // ExecutionType allows the caller to force shell or docker execution of the action type ExecutionType string const ( // ExecutionTypeShell configures commands to run in a local shell ExecutionTypeShell ExecutionType = "shell" // ExecutionTypeDocker configures commands to run in a Docker container ExecutionTypeDocker ExecutionType = "docker" // ExecutionTypeFinch configures commands to run in a Finch container ExecutionTypeFinch ExecutionType = "finch" ) // DefaultExecutionType determines the appropriate default func DefaultExecutionType() ExecutionType { if _, ok := os.LookupEnv("__MDE_ENVIRONMENT_ID"); ok { return ExecutionTypeShell } defaultSP := containers.DefaultServiceProvider(context.Background()) switch defaultSP { case containers.Finch: return ExecutionTypeFinch case containers.Docker: return ExecutionTypeDocker default: return ExecutionTypeShell } }