pkg/operator/operator.go (353 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package operator contains the Prometheus operator. package operator import ( "context" "errors" "fmt" "net" "net/http" "strconv" "github.com/go-logr/logr" "github.com/prometheus/client_golang/api" "github.com/prometheus/client_golang/prometheus" arv1 "k8s.io/api/admissionregistration/v1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" runtimeutil "k8s.io/apimachinery/pkg/util/runtime" autoscalingv1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/manager" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/webhook" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" ) const ( // DefaultOperatorNamespace is the namespace in which all resources owned by the operator are installed. DefaultOperatorNamespace = "gmp-system" // DefaultPublicNamespace is the namespace where the operator will check for user-specified // configuration data. DefaultPublicNamespace = "gmp-public" // NameOperator is a fixed name used in various resources managed by the operator. NameOperator = "gmp-operator" // componentName is a fixed name used in various resources managed by the operator. componentName = "managed_prometheus" // Filename for configuration files. configFilename = "config.yaml" // LabelAppName is the well-known app name label. LabelAppName = "app.kubernetes.io/name" // LabelInstanceName is the well-known instance name label. LabelInstanceName = "app.kubernetes.io/instance" // AnnotationMetricName is the component name, will be exposed as metric name. AnnotationMetricName = "components.gke.io/component-name" // ClusterAutoscalerSafeEvictionLabel is the annotation label that determines // whether the cluster autoscaler can safely evict a Pod when the Pod doesn't // satisfy certain eviction criteria. ClusterAutoscalerSafeEvictionLabel = "cluster-autoscaler.kubernetes.io/safe-to-evict" // KubernetesAppName is the k8s Application, will be exposed as component name. KubernetesAppName = "app" // RuleEvaluatorAppName is the name of the rule-evaluator application. RuleEvaluatorAppName = "managed-prometheus-rule-evaluator" // AlertmanagerAppName is the name of the alert manager application. AlertmanagerAppName = "managed-prometheus-alertmanager" // The level of concurrency to use to fetch all targets. defaultTargetPollConcurrency = 4 ) // Operator to implement managed collection for Google Prometheus Engine. type Operator struct { logger logr.Logger opts Options client client.Client manager manager.Manager vpaAvailable bool } // Options for the Operator. type Options struct { // ID of the project of the cluster. ProjectID string // Location of the cluster. Location string // Name of the cluster the operator acts on. Cluster string // Namespace to which the operator deploys any associated resources. OperatorNamespace string // Namespace to which the operator looks for user-specified configuration // data, like Secrets and ConfigMaps. PublicNamespace string // Health and readiness serving address. ProbeAddr string // Certificate of the server in base 64. TLSCert string // Key of the server in base 64. TLSKey string // Certificate authority in base 64. CACert string // CertDir is the path to a directory containing TLS certificates for the webhook server CertDir string // Webhook serving address. ListenAddr string // Cleanup resources without this annotation. CleanupAnnotKey string // The number of upper bound threads to use for target polling otherwise // use the default. TargetPollConcurrency uint16 // The HTTP client to use when targeting collector endpoints. CollectorHTTPClient *http.Client } func (o *Options) defaultAndValidate(_ logr.Logger) error { if o.OperatorNamespace == "" { o.OperatorNamespace = DefaultOperatorNamespace } if o.PublicNamespace == "" { // For non-managed deployments, default to same namespace // as operator, assuming cluster operators prefer consolidating // resources in a single namespace. o.PublicNamespace = DefaultOperatorNamespace } // ProjectID and Cluster must be always be set. Collectors and rule-evaluator can // auto-discover them but we need them in the operator to scope generated rules. if o.ProjectID == "" { return errors.New("projectID must be set") } if o.Cluster == "" { return errors.New("cluster must be set") } if o.TargetPollConcurrency == 0 { o.TargetPollConcurrency = defaultTargetPollConcurrency } if o.CollectorHTTPClient == nil { // Matches the default Prometheus API library HTTP client. o.CollectorHTTPClient = &http.Client{ Transport: api.DefaultRoundTripper, } } return nil } // NewScheme creates a new Kubernetes runtime.Scheme for the GMP Operator. func NewScheme() (*runtime.Scheme, error) { sc := runtime.NewScheme() if err := scheme.AddToScheme(sc); err != nil { return nil, fmt.Errorf("add Kubernetes core scheme: %w", err) } if err := monitoringv1.AddToScheme(sc); err != nil { return nil, fmt.Errorf("add monitoringv1 scheme: %w", err) } if err := autoscalingv1.AddToScheme(sc); err != nil { return nil, fmt.Errorf("add autoscalerv1 scheme: %w", err) } return sc, nil } // New instantiates a new Operator. func New(logger logr.Logger, clientConfig *rest.Config, opts Options) (*Operator, error) { if err := opts.defaultAndValidate(logger); err != nil { return nil, fmt.Errorf("invalid options: %w", err) } sc, err := NewScheme() if err != nil { return nil, fmt.Errorf("unable to initialize Kubernetes scheme: %w", err) } host, portStr, err := net.SplitHostPort(opts.ListenAddr) if err != nil { return nil, fmt.Errorf("invalid listen address: %w", err) } port, err := strconv.Atoi(portStr) if err != nil { return nil, fmt.Errorf("invalid port: %w", err) } watchObjects := map[client.Object]cache.ByObject{ &corev1.Pod{}: { Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.OperatorNamespace}), }, &monitoringv1.PodMonitoring{}: { Field: fields.Everything(), }, &monitoringv1.ClusterPodMonitoring{}: { Field: fields.Everything(), }, &monitoringv1.ClusterNodeMonitoring{}: { Field: fields.Everything(), }, &monitoringv1.GlobalRules{}: { Field: fields.Everything(), }, &monitoringv1.ClusterRules{}: { Field: fields.Everything(), }, &monitoringv1.Rules{}: { Field: fields.Everything(), }, &corev1.Secret{}: { Namespaces: map[string]cache.Config{ opts.OperatorNamespace: {}, opts.PublicNamespace: {}, }, }, &monitoringv1.OperatorConfig{}: { Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.PublicNamespace}), }, &corev1.Service{}: { Field: fields.SelectorFromSet(fields.Set{ "metadata.namespace": opts.OperatorNamespace, "metadata.name": NameAlertmanager, }), }, &corev1.ConfigMap{}: { Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.OperatorNamespace}), }, &appsv1.DaemonSet{}: { Field: fields.SelectorFromSet(fields.Set{ "metadata.namespace": opts.OperatorNamespace, "metadata.name": NameCollector, }), }, &appsv1.Deployment{}: { Field: fields.SelectorFromSet(fields.Set{ "metadata.namespace": opts.OperatorNamespace, "metadata.name": NameRuleEvaluator, }), }, &appsv1.StatefulSet{}: { Field: fields.SelectorFromSet(fields.Set{ "metadata.namespace": opts.OperatorNamespace, "metadata.name": NameAlertmanager, }), }, } // Determine whether VPA is installed in the cluster. If so, set up the scaling controller. var vpaAvailable bool coreClientConfig := rest.CopyConfig(clientConfig) coreClientConfig.ContentType = runtime.ContentTypeProtobuf clientset, err := apiextensions.NewForConfig(coreClientConfig) if err != nil { return nil, fmt.Errorf("create clientset: %w", err) } if _, err := clientset.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "verticalpodautoscalers.autoscaling.k8s.io", metav1.GetOptions{}); err != nil { logger.Info("vertical pod autoscaling is not available, scaling.vpa.enabled option on the OperatorConfig will not work") } else { logger.Info("vertical pod autoscaling available, monitoring OperatorConfig for scaling.vpa.enabled option") vpaAvailable = true watchObjects[&autoscalingv1.VerticalPodAutoscaler{}] = cache.ByObject{ Field: fields.SelectorFromSet(fields.Set{ "metadata.namespace": opts.OperatorNamespace, }), } } manager, err := ctrl.NewManager(clientConfig, manager.Options{ Logger: logger, Scheme: sc, WebhookServer: webhook.NewServer(webhook.Options{ Host: host, Port: port, CertDir: opts.CertDir, }), // Don't run a metrics server with the manager. Metrics are being served. // explicitly in the main routine. Metrics: metricsserver.Options{ BindAddress: "0", }, HealthProbeBindAddress: opts.ProbeAddr, // Manage cluster-wide and namespace resources at the same time. NewCache: cache.NewCacheFunc(func(_ *rest.Config, options cache.Options) (cache.Cache, error) { return cache.New(clientConfig, cache.Options{ Scheme: options.Scheme, // The presence of metadata.namespace has special handling internally causing the // cache's watch-list to only watch that namespace. ByObject: watchObjects, }) }), }) if err != nil { return nil, fmt.Errorf("create controller manager: %w", err) } webhookChecker := manager.GetWebhookServer().StartedChecker() if err := manager.AddHealthzCheck("webhooks", webhookChecker); err != nil { return nil, fmt.Errorf("add healthz check for webhooks: %w", err) } if err := manager.AddReadyzCheck("webhooks", webhookChecker); err != nil { return nil, fmt.Errorf("add readyz check for webhooks: %w", err) } client, err := client.New(clientConfig, client.Options{Scheme: sc}) if err != nil { return nil, fmt.Errorf("create client: %w", err) } op := &Operator{ logger: logger, opts: opts, client: client, manager: manager, vpaAvailable: vpaAvailable, } return op, nil } // Run the reconciliation loop of the operator. // The passed owner references are set on cluster-wide resources created by the // operator. func (o *Operator) Run(ctx context.Context, registry prometheus.Registerer) error { defer runtimeutil.HandleCrash() if err := o.cleanupOldResources(ctx); err != nil { return fmt.Errorf("cleanup old resources: %w", err) } if err := setupAdmissionWebhooks(ctx, o.logger, o.client, o.manager.GetWebhookServer().(*webhook.DefaultServer), &o.opts, o.vpaAvailable); err != nil { return fmt.Errorf("init admission resources: %w", err) } if err := setupCollectionControllers(o); err != nil { return fmt.Errorf("setup collection controllers: %w", err) } if err := setupRulesControllers(o); err != nil { return fmt.Errorf("setup rules controllers: %w", err) } if err := setupOperatorConfigControllers(o); err != nil { return fmt.Errorf("setup rule-evaluator controllers: %w", err) } if o.vpaAvailable { if err := setupScalingController(o); err != nil { return fmt.Errorf("setup scaling controllers: %w", err) } } if err := setupTargetStatusPoller(o, registry, o.opts.CollectorHTTPClient); err != nil { return fmt.Errorf("setup target status processor: %w", err) } o.logger.Info("starting GMP operator") return o.manager.Start(ctx) } func (o *Operator) cleanupOldResources(ctx context.Context) error { // Delete old ValidatingWebhookConfiguration that was installed directly by the operator. // in previous versions. validatingWebhookConfig := arv1.ValidatingWebhookConfiguration{ ObjectMeta: metav1.ObjectMeta{Name: "gmp-operator"}, } if err := o.client.Delete(ctx, &validatingWebhookConfig); err != nil { switch { case apierrors.IsForbidden(err): o.logger.Info("delete legacy ValidatingWebHookConfiguration was not allowed. Please remove it manually") case !apierrors.IsNotFound(err): return fmt.Errorf("delete legacy ValidatingWebHookConfiguration failed: %w", err) } } // If cleanup annotations are not provided, do not clean up any further. if o.opts.CleanupAnnotKey == "" { return nil } // Cleanup resources without the provided annotation. // Check the collector DaemonSet. dsKey := client.ObjectKey{ Name: NameCollector, Namespace: o.opts.OperatorNamespace, } var ds appsv1.DaemonSet if err := o.client.Get(ctx, dsKey, &ds); apierrors.IsNotFound(err) { return fmt.Errorf("get collector DaemonSet: %w", err) } if _, ok := ds.Annotations[o.opts.CleanupAnnotKey]; !ok { if err := o.client.Delete(ctx, &ds); err != nil { switch { case apierrors.IsForbidden(err): o.logger.Info("delete collector was not allowed. Please remove it manually", "err", err) case !apierrors.IsNotFound(err): return fmt.Errorf("cleanup collector failed: %w", err) } } } // Check the rule-evaluator Deployment. deployKey := client.ObjectKey{ Name: NameRuleEvaluator, Namespace: o.opts.OperatorNamespace, } var deploy appsv1.Deployment if err := o.client.Get(ctx, deployKey, &deploy); apierrors.IsNotFound(err) { return fmt.Errorf("get rule-evaluator Deployment: %w", err) } if _, ok := deploy.Annotations[o.opts.CleanupAnnotKey]; !ok { if err := o.client.Delete(ctx, &deploy); err != nil { switch { case apierrors.IsForbidden(err): o.logger.Info("delete rule-evaluator was not allowed. Please remove it manually", "err", err) case !apierrors.IsNotFound(err): return fmt.Errorf("cleanup rule-evaluator failed: %w", err) } } } return nil } // namespacedNamePredicate is an event filter predicate that only allows events with // a single object. type namespacedNamePredicate struct { namespace string name string } func (o namespacedNamePredicate) Create(e event.CreateEvent) bool { return e.Object.GetNamespace() == o.namespace && e.Object.GetName() == o.name } func (o namespacedNamePredicate) Update(e event.UpdateEvent) bool { return e.ObjectNew.GetNamespace() == o.namespace && e.ObjectNew.GetName() == o.name } func (o namespacedNamePredicate) Delete(e event.DeleteEvent) bool { return e.Object.GetNamespace() == o.namespace && e.Object.GetName() == o.name } func (o namespacedNamePredicate) Generic(e event.GenericEvent) bool { return e.Object.GetNamespace() == o.namespace && e.Object.GetName() == o.name } // enqueueConst always enqueues the same request regardless of the event. type enqueueConst reconcile.Request func (e enqueueConst) Create(_ context.Context, _ event.CreateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request(e)) } func (e enqueueConst) Update(_ context.Context, _ event.UpdateEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request(e)) } func (e enqueueConst) Delete(_ context.Context, _ event.DeleteEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request(e)) } func (e enqueueConst) Generic(_ context.Context, _ event.GenericEvent, q workqueue.RateLimitingInterface) { q.Add(reconcile.Request(e)) }