cmd/eno-controller/main.go (193 lines of code) (raw):

package main import ( "flag" "fmt" "io" "os" "strings" "time" "github.com/go-logr/logr" "github.com/go-logr/zapr" "go.uber.org/zap" "go.uber.org/zap/zapcore" "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" v1 "github.com/Azure/eno/api/v1" "github.com/Azure/eno/internal/controllers/composition" "github.com/Azure/eno/internal/controllers/resourceslice" "github.com/Azure/eno/internal/controllers/scheduling" "github.com/Azure/eno/internal/controllers/symphony" "github.com/Azure/eno/internal/controllers/synthesis" "github.com/Azure/eno/internal/controllers/watch" "github.com/Azure/eno/internal/execution" "github.com/Azure/eno/internal/manager" ) func main() { if len(os.Args) > 1 && os.Args[1] == "install-executor" { installExecutor() return } if strings.HasSuffix(os.Args[0], "executor") { runExecutor() return } if err := runController(); err != nil { fmt.Fprintf(os.Stderr, "error: %s\n", err) os.Exit(1) } } func runController() error { ctx := ctrl.SetupSignalHandler() var ( debugLogging bool watchdogThres time.Duration rolloutCooldown time.Duration selfHealingGracePeriod time.Duration taintToleration string nodeAffinity string concurrencyLimit int containerCreationTimeout time.Duration synconf = &synthesis.Config{} mgrOpts = &manager.Options{ Rest: ctrl.GetConfigOrDie(), } ) flag.StringVar(&synconf.PodNamespace, "synthesizer-pod-namespace", os.Getenv("POD_NAMESPACE"), "Namespace to create synthesizer pods in. Defaults to POD_NAMESPACE.") flag.StringVar(&synconf.ExecutorImage, "executor-image", os.Getenv("EXECUTOR_IMAGE"), "Reference to the image that will be used to execute synthesizers. Defaults to EXECUTOR_IMAGE.") flag.StringVar(&synconf.PodServiceAccount, "synthesizer-pod-service-account", "", "Service account name to be assigned to synthesizer Pods.") flag.DurationVar(&containerCreationTimeout, "container-creation-ttl", time.Second*3, "Timeout when waiting for kubelet to ack scheduled pods. Protects tail latency from kubelet network partitions") flag.BoolVar(&debugLogging, "debug", true, "Enable debug logging") flag.DurationVar(&watchdogThres, "watchdog-threshold", time.Minute*3, "How long before the watchdog considers a mid-transition resource to be stuck") flag.DurationVar(&rolloutCooldown, "rollout-cooldown", time.Minute, "How long before an update to a related resource (synthesizer, bindings, etc.) will trigger a second composition's re-synthesis") flag.StringVar(&taintToleration, "taint-toleration", "", "Node NoSchedule taint to be tolerated by synthesizer pods e.g. taintKey=taintValue to match on value, just taintKey to match on presence of the taint") flag.StringVar(&nodeAffinity, "node-affinity", "", "Synthesizer pods will be created with this required node affinity expression e.g. labelKey=labelValue to match on value, just labelKey to match on presence of the label") flag.IntVar(&concurrencyLimit, "concurrency-limit", 10, "Upper bound on active syntheses. This effectively limits the number of running synthesizer pods spawned by Eno.") flag.DurationVar(&selfHealingGracePeriod, "self-healing-grace-period", time.Minute*5, "How long before the self-healing controllers are allowed to start the resynthesis process.") mgrOpts.Bind(flag.CommandLine) flag.Parse() synconf.NodeAffinityKey, synconf.NodeAffinityValue = parseKeyValue(nodeAffinity) synconf.TaintTolerationKey, synconf.TaintTolerationValue = parseKeyValue(taintToleration) if synconf.ExecutorImage == "" { return fmt.Errorf("a value is required in --executor-image or EXECUTOR_IMAGE") } if synconf.PodNamespace == "" { return fmt.Errorf("a value is required in --synthesizer-pod-namespace or POD_NAMESPACE") } mgrOpts.SynthesizerPodNamespace = synconf.PodNamespace zapCfg := zap.NewProductionConfig() if debugLogging { zapCfg.Level = zap.NewAtomicLevelAt(zapcore.DebugLevel) } zl, err := zapCfg.Build() if err != nil { return err } logger := zapr.NewLogger(zl) mgrOpts.Rest.UserAgent = "eno-controller" mgr, err := manager.New(logger, mgrOpts) if err != nil { return fmt.Errorf("constructing manager: %w", err) } err = synthesis.NewPodLifecycleController(mgr, synconf) if err != nil { return fmt.Errorf("constructing pod lifecycle controller: %w", err) } err = synthesis.NewPodGC(mgr, containerCreationTimeout) if err != nil { return fmt.Errorf("constructing pod garbage collector: %w", err) } err = resourceslice.NewController(mgr) if err != nil { return fmt.Errorf("constructing resource slice controller: %w", err) } err = resourceslice.NewCleanupController(mgr) if err != nil { return fmt.Errorf("constructing resource slice cleanup controller: %w", err) } err = watch.NewController(mgr) if err != nil { return fmt.Errorf("constructing watch controller: %w", err) } err = scheduling.NewController(mgr, concurrencyLimit, rolloutCooldown, watchdogThres) if err != nil { return fmt.Errorf("constructing synthesis scheduling controller: %w", err) } err = composition.NewController(mgr) if err != nil { return fmt.Errorf("constructing composition controller: %w", err) } err = symphony.NewController(mgr) if err != nil { return fmt.Errorf("constructing symphony controller: %w", err) } return mgr.Start(ctx) } func parseKeyValue(input string) (key, val string) { chunks := strings.SplitN(input, "=", 2) key = chunks[0] if len(chunks) > 1 { val = chunks[1] } return } func installExecutor() { self := os.Args[0] file, err := os.Open(self) if err != nil { panic(err) } defer file.Close() dest, err := os.OpenFile("/eno/executor", os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0777) if err != nil { panic(err) } defer dest.Close() _, err = io.Copy(dest, file) if err != nil { panic(err) } } func runExecutor() { rc := ctrl.GetConfigOrDie() rc.UserAgent = "eno-executor" zapCfg := zap.NewProductionConfig() zl, err := zapCfg.Build() if err != nil { panic(err) } logger := zapr.NewLogger(zl) ctx := logr.NewContext(ctrl.SetupSignalHandler(), logger) hc, err := rest.HTTPClientFor(rc) if err != nil { panic(err) } rm, err := apiutil.NewDynamicRESTMapper(rc, hc) if err != nil { panic(err) } scheme, err := v1.SchemeBuilder.Build() if err != nil { logger.Error(err, "building scheme") os.Exit(1) } client, err := client.New(rc, client.Options{ Scheme: scheme, Mapper: rm, }) if err != nil { logger.Error(err, "building client") os.Exit(1) } e := &execution.Executor{ Reader: client, Writer: client, Handler: execution.NewExecHandler(), } err = e.Synthesize(ctx, execution.LoadEnv()) if err != nil { logger.Error(err, "synthesizing") os.Exit(1) } }