pkg/operator/rules.go (292 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package operator import ( "context" "errors" "fmt" "github.com/go-logr/logr" appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" ) const ( nameRulesGenerated = "rules-generated" ) func setupRulesControllers(op *Operator) error { // The singleton OperatorConfig is the request object we reconcile against. objRequest := reconcile.Request{ NamespacedName: types.NamespacedName{ Namespace: op.opts.PublicNamespace, Name: NameOperatorConfig, }, } // Default OperatorConfig filter. objFilterOperatorConfig := namespacedNamePredicate{ namespace: op.opts.PublicNamespace, name: NameOperatorConfig, } // Rule-evaluator rules ConfigMap filter. objFilterRulesGenerated := namespacedNamePredicate{ namespace: op.opts.OperatorNamespace, name: nameRulesGenerated, } // Reconcile the generated rules that are used by the rule-evaluator deployment. err := ctrl.NewControllerManagedBy(op.manager). Named("rules"). // Filter events without changes for all watches. WithEventFilter(predicate.ResourceVersionChangedPredicate{}). // OperatorConfig is our root resource that ensures we reconcile // at least once initially. For( &monitoringv1.OperatorConfig{}, builder.WithPredicates(objFilterOperatorConfig), ). // Any update to a Rules object requires re-generating the config. Watches( &monitoringv1.GlobalRules{}, enqueueConst(objRequest), ). Watches( &monitoringv1.ClusterRules{}, enqueueConst(objRequest), ). Watches( &monitoringv1.Rules{}, enqueueConst(objRequest), ). // The configuration we generate for the rule-evaluator. Watches( &corev1.ConfigMap{}, enqueueConst(objRequest), builder.WithPredicates(objFilterRulesGenerated), ). Complete(newRulesReconciler(op.manager.GetClient(), op.opts)) if err != nil { return fmt.Errorf("create rules config controller: %w", err) } return nil } type rulesReconciler struct { client client.Client opts Options } func newRulesReconciler(c client.Client, opts Options) *rulesReconciler { return &rulesReconciler{ client: c, opts: opts, } } func (r *rulesReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { logger, _ := logr.FromContext(ctx) logger.Info("reconciling rules") var config monitoringv1.OperatorConfig // Fetch OperatorConfig if it exists. if err := r.client.Get(ctx, req.NamespacedName, &config); apierrors.IsNotFound(err) { logger.Info("no operatorconfig created yet") } else if err != nil { return reconcile.Result{}, fmt.Errorf("get operatorconfig for incoming: %q: %w", req.String(), err) } var projectID, location, cluster = resolveLabels(r.opts.ProjectID, r.opts.Location, r.opts.Cluster, config.Rules.ExternalLabels) if err := r.ensureRuleConfigs(ctx, projectID, location, cluster, config.Features.Config.Compression); err != nil { return reconcile.Result{}, fmt.Errorf("ensure rule configmaps: %w", err) } if err := r.scaleRuleConsumers(ctx); err != nil { return reconcile.Result{}, fmt.Errorf("scale rule consumers: %w", err) } return reconcile.Result{}, nil } func (r *rulesReconciler) scaleRuleConsumers(ctx context.Context) error { logger, _ := logr.FromContext(ctx) var desiredReplicas int32 var hasAnyRules bool for _, check := range []ruleCheck{hasRules, hasClusterRules, hasGlobalRules} { hasRules, err := check(ctx, r.client) if err != nil { return err } if hasRules { hasAnyRules = true break } } if hasAnyRules { desiredReplicas = 1 } scaleClient := r.client.SubResource("scale") alertManagerStatefulSet := appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.opts.OperatorNamespace, Name: "alertmanager", }, } alertManagerScale := autoscalingv1.Scale{} if err := scaleClient.Get(ctx, &alertManagerStatefulSet, &alertManagerScale); apierrors.IsNotFound(err) { msg := fmt.Sprintf("Alertmanager StatefulSet not found, cannot scale to %d. In-cluster Alertmanager will not function.", desiredReplicas) logger.Error(err, msg) } else if err != nil { return err } else if alertManagerScale.Spec.Replicas != desiredReplicas { alertManagerScale.Spec.Replicas = desiredReplicas if err := scaleClient.Update(ctx, &alertManagerStatefulSet, client.WithSubResourceBody(&alertManagerScale)); err != nil { return err } } ruleEvaluatorDeployment := appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.opts.OperatorNamespace, Name: "rule-evaluator", }, } ruleEvaluatorScale := autoscalingv1.Scale{} if err := scaleClient.Get(ctx, &ruleEvaluatorDeployment, &ruleEvaluatorScale); apierrors.IsNotFound(err) { msg := fmt.Sprintf("Rule Evaluator Deployment not found, cannot scale to %d. In-cluster Rule Evaluator will not function.", desiredReplicas) logger.Error(err, msg) } else if err != nil { return err } else if ruleEvaluatorScale.Spec.Replicas != desiredReplicas { ruleEvaluatorScale.Spec.Replicas = desiredReplicas if err := scaleClient.Update(ctx, &ruleEvaluatorDeployment, client.WithSubResourceBody(&ruleEvaluatorScale)); err != nil { return err } } return nil } type ruleCheck func(context.Context, client.Client) (bool, error) func hasRules(ctx context.Context, c client.Client) (bool, error) { var rules monitoringv1.RulesList if err := c.List(ctx, &rules); err != nil { return false, err } return len(rules.Items) > 0, nil } func hasClusterRules(ctx context.Context, c client.Client) (bool, error) { var rules monitoringv1.ClusterRulesList if err := c.List(ctx, &rules); err != nil { return false, err } return len(rules.Items) > 0, nil } func hasGlobalRules(ctx context.Context, c client.Client) (bool, error) { var rules monitoringv1.GlobalRulesList if err := c.List(ctx, &rules); err != nil { return false, err } return len(rules.Items) > 0, nil } // ensureRuleConfigs updates the Prometheus Rules ConfigMap. func (r *rulesReconciler) ensureRuleConfigs(ctx context.Context, projectID, location, cluster string, compression monitoringv1.CompressionType) error { logger, _ := logr.FromContext(ctx) // Re-generate the configmap that's loaded by the rule-evaluator. cm := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: r.opts.OperatorNamespace, Name: nameRulesGenerated, Labels: map[string]string{ LabelAppName: NameRuleEvaluator, }, }, // Ensure there's always at least an empty, uncompressed dummy file as the evaluator // expects at least one match. Data: map[string]string{ "empty.yaml": "", }, } // Generate a final rule file for each Rules resource. // // Depending on the scope level (global, cluster, namespace) the rules will be generated // so that queries are constrained to the appropriate project_id, cluster, and namespace // labels and that they are preserved through query aggregations and appear on the // output data. // // The location is not scoped as it's not a meaningful boundary for "human access" // to data as clusters may span locations. var rulesList monitoringv1.RulesList if err := r.client.List(ctx, &rulesList); err != nil { return fmt.Errorf("list rules: %w", err) } now := metav1.Now() conditionSuccess := &monitoringv1.MonitoringCondition{ Type: monitoringv1.ConfigurationCreateSuccess, Status: corev1.ConditionTrue, } var statusUpdates []monitoringv1.MonitoringCRD for i := range rulesList.Items { rs := &rulesList.Items[i] result, err := rs.RuleGroupsConfig(projectID, location, cluster) if err != nil { msg := "generating rule config failed" if rs.Status.SetMonitoringCondition(rs.GetGeneration(), now, &monitoringv1.MonitoringCondition{ Type: monitoringv1.ConfigurationCreateSuccess, Status: corev1.ConditionFalse, Message: msg, Reason: err.Error(), }) { statusUpdates = append(statusUpdates, rs) } logger.Error(err, "convert rules", "err", err, "namespace", rs.Namespace, "name", rs.Name) continue } filename := fmt.Sprintf("rules__%s__%s.yaml", rs.Namespace, rs.Name) if err := setConfigMapData(cm, compression, filename, result); err != nil { return err } if rs.Status.SetMonitoringCondition(rs.GetGeneration(), now, conditionSuccess) { statusUpdates = append(statusUpdates, rs) } } var clusterRulesList monitoringv1.ClusterRulesList if err := r.client.List(ctx, &clusterRulesList); err != nil { return fmt.Errorf("list cluster rules: %w", err) } for i := range clusterRulesList.Items { rs := &clusterRulesList.Items[i] result, err := rs.RuleGroupsConfig(projectID, location, cluster) if err != nil { msg := "generating rule config failed" if rs.Status.SetMonitoringCondition(rs.Generation, now, &monitoringv1.MonitoringCondition{ Type: monitoringv1.ConfigurationCreateSuccess, Status: corev1.ConditionFalse, Message: msg, Reason: err.Error(), }) { statusUpdates = append(statusUpdates, rs) } logger.Error(err, "convert rules", "err", err, "namespace", rs.Namespace, "name", rs.Name) continue } filename := fmt.Sprintf("clusterrules__%s.yaml", rs.Name) if err := setConfigMapData(cm, compression, filename, result); err != nil { return err } if rs.Status.SetMonitoringCondition(rs.GetGeneration(), now, conditionSuccess) { statusUpdates = append(statusUpdates, rs) } } var globalRulesList monitoringv1.GlobalRulesList if err := r.client.List(ctx, &globalRulesList); err != nil { return fmt.Errorf("list global rules: %w", err) } for i := range globalRulesList.Items { rs := &globalRulesList.Items[i] result, err := rs.RuleGroupsConfig() if err != nil { msg := "generating rule config failed" if rs.Status.SetMonitoringCondition(rs.Generation, now, &monitoringv1.MonitoringCondition{ Type: monitoringv1.ConfigurationCreateSuccess, Status: corev1.ConditionFalse, Message: msg, Reason: err.Error(), }) { statusUpdates = append(statusUpdates, rs) } logger.Error(err, "convert rules", "err", err, "namespace", rs.Namespace, "name", rs.Name) continue } filename := fmt.Sprintf("globalrules__%s.yaml", rs.Name) if err := setConfigMapData(cm, compression, filename, result); err != nil { return err } if rs.Status.SetMonitoringCondition(rs.GetGeneration(), now, conditionSuccess) { statusUpdates = append(statusUpdates, rs) } } // Create or update generated rule ConfigMap. if err := r.client.Update(ctx, cm); apierrors.IsNotFound(err) { if err := r.client.Create(ctx, cm); err != nil { return fmt.Errorf("create generated rules: %w", err) } } else if err != nil { return fmt.Errorf("update generated rules: %w", err) } var errs []error for _, obj := range statusUpdates { if err := patchMonitoringStatus(ctx, r.client, obj, obj.GetMonitoringStatus()); err != nil { errs = append(errs, err) } } return errors.Join(errs...) }