v2/cmd/controller/app/setup.go (420 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ package app import ( "context" "fmt" "math/rand" "net/http" "net/http/pprof" "os" "time" . "github.com/Azure/azure-service-operator/v2/internal/logging" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/msi-dataplane/pkg/dataplane" "github.com/benbjohnson/clock" "github.com/go-logr/logr" "github.com/rotisserie/eris" "golang.org/x/time/rate" apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/webhook" "github.com/Azure/azure-service-operator/v2/internal/config" "github.com/Azure/azure-service-operator/v2/internal/controllers" "github.com/Azure/azure-service-operator/v2/internal/crdmanagement" "github.com/Azure/azure-service-operator/v2/internal/genericarmclient" "github.com/Azure/azure-service-operator/v2/internal/identity" asometrics "github.com/Azure/azure-service-operator/v2/internal/metrics" armreconciler "github.com/Azure/azure-service-operator/v2/internal/reconcilers/arm" "github.com/Azure/azure-service-operator/v2/internal/reconcilers/generic" asocel "github.com/Azure/azure-service-operator/v2/internal/util/cel" "github.com/Azure/azure-service-operator/v2/internal/util/interval" "github.com/Azure/azure-service-operator/v2/internal/util/kubeclient" "github.com/Azure/azure-service-operator/v2/internal/util/lockedrand" "github.com/Azure/azure-service-operator/v2/internal/util/to" common "github.com/Azure/azure-service-operator/v2/pkg/common/config" "github.com/Azure/azure-service-operator/v2/pkg/genruntime" "github.com/Azure/azure-service-operator/v2/pkg/genruntime/conditions" ) type Runnable struct { mgr manager.Manager // toStart must not block toStart []func() } func (r *Runnable) Start(ctx context.Context) error { for _, f := range r.toStart { f() } return r.mgr.Start(ctx) } func SetupControllerManager(ctx context.Context, setupLog logr.Logger, flgs *Flags) *Runnable { scheme := controllers.CreateScheme() _ = apiextensions.AddToScheme(scheme) // Used for managing CRDs cfg, err := config.ReadAndValidate() if err != nil { setupLog.Error(err, "unable to get env configuration values") os.Exit(1) } var cacheFunc cache.NewCacheFunc if cfg.TargetNamespaces != nil && cfg.OperatorMode.IncludesWatchers() { cacheFunc = func(config *rest.Config, opts cache.Options) (cache.Cache, error) { opts.DefaultNamespaces = make(map[string]cache.Config, len(cfg.TargetNamespaces)) for _, ns := range cfg.TargetNamespaces { opts.DefaultNamespaces[ns] = cache.Config{} } return cache.New(config, opts) } } k8sConfig := ctrl.GetConfigOrDie() ctrlOptions := ctrl.Options{ Scheme: scheme, NewCache: cacheFunc, LeaderElection: flgs.EnableLeaderElection, LeaderElectionID: "controllers-leader-election-azinfra-generated", // Manually set lease duration (to default) so that we can use it for our leader elector too. // See https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/manager/internal.go#L52 LeaseDuration: to.Ptr(15 * time.Second), RenewDeadline: to.Ptr(10 * time.Second), RetryPeriod: to.Ptr(2 * time.Second), GracefulShutdownTimeout: to.Ptr(30 * time.Second), // It's only safe to set LeaderElectionReleaseOnCancel to true if the manager binary ends // when the manager exits. This is the case with us today, so we set this to true whenever // flgs.EnableLeaderElection is true. LeaderElectionReleaseOnCancel: flgs.EnableLeaderElection, HealthProbeBindAddress: flgs.HealthAddr, Metrics: getMetricsOpts(flgs), WebhookServer: webhook.NewServer(webhook.Options{ Port: flgs.WebhookPort, CertDir: flgs.WebhookCertDir, }), } mgr, err := ctrl.NewManager(k8sConfig, ctrlOptions) if err != nil { setupLog.Error(err, "unable to create manager") os.Exit(1) } //nolint:contextcheck clients, err := initializeClients(cfg, mgr) if err != nil { setupLog.Error(err, "failed to initialize clients") os.Exit(1) } var leaderElector *crdmanagement.LeaderElector if flgs.EnableLeaderElection { //nolint: contextcheck // false positive? leaderElector, err = crdmanagement.NewLeaderElector(k8sConfig, setupLog, ctrlOptions, mgr) if err != nil { setupLog.Error(err, "failed to initialize leader elector") os.Exit(1) } } crdManager, err := newCRDManager(clients.log, mgr.GetConfig(), leaderElector) if err != nil { setupLog.Error(err, "failed to initialize CRD client") os.Exit(1) } existingCRDs := apiextensions.CustomResourceDefinitionList{} err = crdManager.ListCRDs(ctx, &existingCRDs) if err != nil { setupLog.Error(err, "failed to list current CRDs") os.Exit(1) } switch flgs.CRDManagementMode { case "auto": // We only apply CRDs if we're in webhooks mode. No other mode will have CRD CRUD permissions if cfg.OperatorMode.IncludesWebhooks() { // Note that this step will restart the pod when it succeeds err = crdManager.Install(ctx, crdmanagement.Options{ CRDPatterns: flgs.CRDPatterns, ExistingCRDs: &existingCRDs, Path: crdmanagement.CRDLocation, Namespace: cfg.PodNamespace, }) if err != nil { setupLog.Error(err, "failed to apply CRDs") os.Exit(1) } } case "none": setupLog.Info("CRD management mode was set to 'none', the operator will not manage CRDs and assumes they are already installed and matching the operator version") default: setupLog.Error(fmt.Errorf("invalid CRD management mode: %s", flgs.CRDManagementMode), "failed to initialize CRD client") os.Exit(1) } // There are 3 possibilities once we reach here: // 1. Webhooks mode + crd-management-mode=auto: existingCRDs will be up to date (upgraded, crd-pattern applied, etc) // by the time we get here as the pod will keep exiting until it is so (see crdManager.applyCRDs above). // 2. Non-webhooks mode + auto: As outlined in https://azure.github.io/azure-service-operator/guide/authentication/multitenant-deployment/#upgrading // the webhooks mode pod must be upgraded first, so there's not really much practical difference between this and // crd-management-mode=none (see below). // 3. crd-management-mode=none: existingCRDs is the set of CRDs that are installed and we can't do anything else but // trust that they are correct. // TODO: This is not quite true as if we wanted we could still read the CRDs from the filesystem and // TODO: just exit if what we see remotely doesn't match what we have locally, the downside of this is we pay // TODO: the nontrivial startup cost of reading the local copy of CRDs into memory. Since "none" is // TODO: us approximating the standard operator experience we don't perform this assertion currently as most // TODO: operators don't. readyResources := crdmanagement.MakeCRDMap(existingCRDs.Items) if cfg.OperatorMode.IncludesWatchers() { //nolint:contextcheck err = initializeWatchers(readyResources, cfg, mgr, clients) if err != nil { setupLog.Error(err, "failed to initialize watchers") os.Exit(1) } } if cfg.OperatorMode.IncludesWebhooks() { objs := controllers.GetKnownTypes() objs, err = crdmanagement.FilterKnownTypesByReadyCRDs(clients.log, scheme, readyResources, objs) if err != nil { setupLog.Error(err, "failed to filter known types by ready CRDs") os.Exit(1) } if errs := generic.RegisterWebhooks(mgr, objs); errs != nil { setupLog.Error(err, "failed to register webhook for gvks") os.Exit(1) } } // Healthz liveness probe endpoint err = mgr.AddHealthzCheck("healthz", healthz.Ping) if err != nil { setupLog.Error(err, "Failed setting up healthz check") os.Exit(1) } // Readyz probe endpoint err = mgr.AddReadyzCheck("readyz", healthz.Ping) if err != nil { setupLog.Error(err, "Failed setting up readyz check") os.Exit(1) } // Readyz probe endpoint err = mgr.AddReadyzCheck("readyz", healthz.Ping) if err != nil { setupLog.Error(err, "Failed setting up readyz check") os.Exit(1) } return &Runnable{ mgr: mgr, toStart: []func(){ // Starts the expression caches. Note that we don't need to stop these we'll // let process teardown stop them clients.expressionEvaluator.Start, }, } } func getMetricsOpts(flags *Flags) server.Options { var metricsOptions server.Options if flags.SecureMetrics { metricsOptions = server.Options{ BindAddress: flags.MetricsAddr, SecureServing: true, FilterProvider: filters.WithAuthenticationAndAuthorization, } // Note that pprof endpoints are meant to be sensitive and shouldn't be exposed publicly. if flags.ProfilingMetrics { metricsOptions.ExtraHandlers = map[string]http.Handler{ "/debug/pprof/": http.HandlerFunc(pprof.Index), "/debug/pprof/cmdline": http.HandlerFunc(pprof.Cmdline), "/debug/pprof/profile": http.HandlerFunc(pprof.Profile), "/debug/pprof/symbol": http.HandlerFunc(pprof.Symbol), "/debug/pprof/trace": http.HandlerFunc(pprof.Trace), } } } else { metricsOptions = server.Options{ BindAddress: flags.MetricsAddr, } } return metricsOptions } func getDefaultAzureCredential(cfg config.Values, setupLog logr.Logger) (*identity.Credential, error) { tokenCred, err := getDefaultAzureTokenCredential(cfg, setupLog) if err != nil { return nil, err } if tokenCred == nil { return nil, nil } return identity.NewDefaultCredential( tokenCred, cfg.PodNamespace, cfg.SubscriptionID, cfg.AdditionalTenants, ), nil } func getDefaultAzureTokenCredential(cfg config.Values, setupLog logr.Logger) (azcore.TokenCredential, error) { // If subscriptionID is not supplied, then set default credential to not be used/nil if cfg.SubscriptionID == "" { setupLog.Info("No global credential configured, continuing without default global credential.") return nil, nil } if cfg.UseWorkloadIdentityAuth { credential, err := azidentity.NewWorkloadIdentityCredential( &azidentity.WorkloadIdentityCredentialOptions{ ClientOptions: azcore.ClientOptions{ Cloud: cfg.Cloud(), }, ClientID: cfg.ClientID, TenantID: cfg.TenantID, TokenFilePath: identity.FederatedTokenFilePath, AdditionallyAllowedTenants: cfg.AdditionalTenants, }) if err != nil { return nil, eris.Wrapf(err, "unable to get workload identity credential") } return credential, nil } if cert := os.Getenv(common.AzureClientCertificate); cert != "" { certPassword := os.Getenv(common.AzureClientCertificatePassword) credential, err := identity.NewClientCertificateCredential( cfg.TenantID, cfg.ClientID, []byte(cert), []byte(certPassword), &azidentity.ClientCertificateCredentialOptions{ ClientOptions: azcore.ClientOptions{ Cloud: cfg.Cloud(), }, AdditionallyAllowedTenants: cfg.AdditionalTenants, }) if err != nil { return nil, eris.Wrapf(err, "unable to get client certificate credential") } return credential, nil } // This authentication type is similar to user assigned managed identity authentication combined with client certificate // authentication. As a 1st party Microsoft application, one has access to pull a user assigned managed identity's backing // certificate information from the MSI data plane. Using this data, a user can authenticate to Azure Cloud. if userAssignedCredentialsPath := os.Getenv(common.AzureUserAssignedIdentityCredentials); userAssignedCredentialsPath != "" { options := azcore.ClientOptions{ Cloud: cfg.Cloud(), } credential, err := dataplane.NewUserAssignedIdentityCredential(context.Background(), userAssignedCredentialsPath, dataplane.WithClientOpts(options)) if err != nil { return nil, eris.Wrapf(err, "unable to get user assigned identity credential") } return credential, nil } credential, err := azidentity.NewDefaultAzureCredential( &azidentity.DefaultAzureCredentialOptions{ ClientOptions: azcore.ClientOptions{ Cloud: cfg.Cloud(), }, AdditionallyAllowedTenants: cfg.AdditionalTenants, }) if err != nil { return nil, eris.Wrapf(err, "unable to get default azure credential") } return credential, err } type clients struct { positiveConditions *conditions.PositiveConditionBuilder armConnectionFactory armreconciler.ARMConnectionFactory credentialProvider identity.CredentialProvider kubeClient kubeclient.Client expressionEvaluator asocel.ExpressionEvaluator log logr.Logger options generic.Options } func initializeClients(cfg config.Values, mgr ctrl.Manager) (*clients, error) { armMetrics := asometrics.NewARMClientMetrics() celMetrics := asometrics.NewCEL() asometrics.RegisterMetrics(armMetrics, celMetrics) log := ctrl.Log.WithName("controllers") credential, err := getDefaultAzureCredential(cfg, log) if err != nil { return nil, eris.Wrap(err, "error while fetching default global credential") } kubeClient := kubeclient.NewClient(mgr.GetClient()) credentialProvider := identity.NewCredentialProvider( credential, kubeClient, &identity.CredentialProviderOptions{ Cloud: to.Ptr(cfg.Cloud()), }) armClientCache := armreconciler.NewARMClientCache( credentialProvider, kubeClient, cfg.Cloud(), nil, armMetrics) genericarmclient.AddToUserAgent(cfg.UserAgentSuffix) var connectionFactory armreconciler.ARMConnectionFactory = func(ctx context.Context, obj genruntime.ARMMetaObject) (armreconciler.Connection, error) { return armClientCache.GetConnection(ctx, obj) } positiveConditions := conditions.NewPositiveConditionBuilder(clock.New()) expressionEvaluator, err := asocel.NewExpressionEvaluator( asocel.Metrics(celMetrics), asocel.Log(log), ) if err != nil { return nil, eris.Wrap(err, "error creating expression evaluator") } // Register the evaluator for use by webhooks asocel.RegisterEvaluator(expressionEvaluator) options := makeControllerOptions(log, cfg) return &clients{ positiveConditions: positiveConditions, armConnectionFactory: connectionFactory, credentialProvider: credentialProvider, kubeClient: kubeClient, expressionEvaluator: expressionEvaluator, log: log, options: options, }, nil } func initializeWatchers(readyResources map[string]apiextensions.CustomResourceDefinition, cfg config.Values, mgr ctrl.Manager, clients *clients) error { clients.log.V(Status).Info("Configuration details", "config", cfg.String()) objs, err := controllers.GetKnownStorageTypes( mgr, clients.armConnectionFactory, clients.credentialProvider, clients.kubeClient, clients.positiveConditions, clients.expressionEvaluator, clients.options) if err != nil { return eris.Wrap(err, "failed getting storage types and reconcilers") } // Filter the types to register objs, err = crdmanagement.FilterStorageTypesByReadyCRDs(clients.log, mgr.GetScheme(), readyResources, objs) if err != nil { return eris.Wrap(err, "failed to filter storage types by ready CRDs") } err = generic.RegisterAll( mgr, mgr.GetFieldIndexer(), clients.kubeClient, clients.positiveConditions, objs, clients.options) if err != nil { return eris.Wrap(err, "failed to register gvks") } return nil } func makeControllerOptions(log logr.Logger, cfg config.Values) generic.Options { var additionalRateLimiters []workqueue.TypedRateLimiter[reconcile.Request] if cfg.RateLimit.Mode == config.RateLimitModeBucket { additionalRateLimiters = append( additionalRateLimiters, &workqueue.TypedBucketRateLimiter[reconcile.Request]{ Limiter: rate.NewLimiter(rate.Limit(cfg.RateLimit.QPS), cfg.RateLimit.BucketSize), }) } // If sync period isn't set, set verySlow delay at 24h, otherwise set it // to the sync period. verySlowDelay := 24 * time.Hour if cfg.SyncPeriod != nil { verySlowDelay = *cfg.SyncPeriod } return generic.Options{ Config: cfg, Options: controller.Options{ MaxConcurrentReconciles: cfg.MaxConcurrentReconciles, LogConstructor: func(req *reconcile.Request) logr.Logger { // refer to https://github.com/kubernetes-sigs/controller-runtime/pull/1827/files if req == nil { return log } // TODO: do we need GVK here too? return log.WithValues("namespace", req.Namespace, "name", req.Name) }, // These rate limits are used for happy-path backoffs (for example polling async operation IDs for PUT/DELETE) RateLimiter: generic.NewRateLimiter(1*time.Second, 1*time.Minute, additionalRateLimiters...), }, PanicHandler: func() {}, RequeueIntervalCalculator: interval.NewCalculator( // These rate limits are primarily for ReadyConditionImpactingError's interval.CalculatorParameters{ //nolint:gosec // do not want cryptographic randomness here Rand: rand.New(lockedrand.NewSource(time.Now().UnixNano())), ErrorBaseDelay: 1 * time.Second, ErrorMaxFastDelay: 30 * time.Second, ErrorMaxSlowDelay: 3 * time.Minute, ErrorVerySlowDelay: verySlowDelay, SyncPeriod: cfg.SyncPeriod, }), } } func newCRDManager( logger logr.Logger, k8sConfig *rest.Config, leaderElection *crdmanagement.LeaderElector, ) (*crdmanagement.Manager, error) { crdScheme := runtime.NewScheme() _ = apiextensions.AddToScheme(crdScheme) crdClient, err := client.New(k8sConfig, client.Options{Scheme: crdScheme}) if err != nil { return nil, eris.Wrap(err, "unable to create CRD client") } crdManager := crdmanagement.NewManager(logger, kubeclient.NewClient(crdClient), leaderElection) return crdManager, nil }