pkg/operator/collection.go (424 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operator
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"path"
"sort"
"github.com/go-logr/logr"
"github.com/prometheus/common/config"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
yaml "gopkg.in/yaml.v3"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/secrets"
)
func setupCollectionControllers(op *Operator) error {
// The singleton OperatorConfig is the request object we reconcile against.
objRequest := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: op.opts.PublicNamespace,
Name: NameOperatorConfig,
},
}
// Default OperatorConfig filter.
objFilterOperatorConfig := namespacedNamePredicate{
namespace: op.opts.PublicNamespace,
name: NameOperatorConfig,
}
// Collector ConfigMap and Daemonset filter.
objFilterCollector := namespacedNamePredicate{
namespace: op.opts.OperatorNamespace,
name: NameCollector,
}
// Collector secret.
objFilterSecret := namespacedNamePredicate{
namespace: op.opts.OperatorNamespace,
name: CollectionSecretName,
}
// Reconcile the generated Prometheus configuration that is used by all collectors.
err := ctrl.NewControllerManagedBy(op.manager).
Named("collector-config").
// Filter events without changes for all watches.
WithEventFilter(predicate.ResourceVersionChangedPredicate{}).
// OperatorConfig is our root resource that ensures we reconcile
// at least once initially.
For(
&monitoringv1.OperatorConfig{},
builder.WithPredicates(objFilterOperatorConfig),
).
// Any update to a PodMonitoring requires regenerating the config.
Watches(
&monitoringv1.PodMonitoring{},
enqueueConst(objRequest),
builder.WithPredicates(predicate.GenerationChangedPredicate{}),
).
// Any update to a ClusterPodMonitoring requires regenerating the config.
Watches(
&monitoringv1.ClusterPodMonitoring{},
enqueueConst(objRequest),
builder.WithPredicates(predicate.GenerationChangedPredicate{}),
).
// Any update to a ClusterNodeMonitoring requires regenerating the config.
Watches(
&monitoringv1.ClusterNodeMonitoring{},
enqueueConst(objRequest),
builder.WithPredicates(predicate.GenerationChangedPredicate{}),
).
// The configuration we generate for the collectors.
Watches(
&corev1.ConfigMap{},
enqueueConst(objRequest),
builder.WithPredicates(objFilterCollector),
).
// Detect and undo changes to the daemon set.
Watches(
&appsv1.DaemonSet{},
enqueueConst(objRequest),
builder.WithPredicates(
objFilterCollector,
predicate.GenerationChangedPredicate{},
)).
// Detect and undo changes to the secret.
Watches(
&corev1.Secret{},
enqueueConst(objRequest),
builder.WithPredicates(objFilterSecret)).
Complete(newCollectionReconciler(op.manager.GetClient(), op.opts))
if err != nil {
return fmt.Errorf("create collector config controller: %w", err)
}
return nil
}
type collectionReconciler struct {
client client.Client
opts Options
}
func newCollectionReconciler(c client.Client, opts Options) *collectionReconciler {
return &collectionReconciler{
client: c,
opts: opts,
}
}
func patchMonitoringStatus(ctx context.Context, kubeClient client.Client, obj client.Object, status *monitoringv1.MonitoringStatus) error {
// TODO(TheSpiritXIII): In the future, change this to server side apply as opposed to patch.
patchStatus := map[string]interface{}{
"conditions": status.Conditions,
"observedGeneration": status.ObservedGeneration,
}
patchObject := map[string]interface{}{"status": patchStatus}
patchBytes, err := json.Marshal(patchObject)
if err != nil {
return err
}
patch := client.RawPatch(types.MergePatchType, patchBytes)
if err := kubeClient.Status().Patch(ctx, obj, patch); err != nil {
return fmt.Errorf("patch status: %w", err)
}
return nil
}
func (r *collectionReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logger, _ := logr.FromContext(ctx)
logger.Info("reconciling collection")
var config monitoringv1.OperatorConfig
// Fetch OperatorConfig if it exists.
if err := r.client.Get(ctx, req.NamespacedName, &config); apierrors.IsNotFound(err) {
logger.Info("no operatorconfig created yet")
} else if err != nil {
return reconcile.Result{}, fmt.Errorf("get operatorconfig for incoming: %q: %w", req.String(), err)
}
if err := r.ensureCollectorSecrets(ctx, &config.Collection); err != nil {
return reconcile.Result{}, fmt.Errorf("ensure collector secrets: %w", err)
}
// Deploy Prometheus collector as a node agent.
if err := r.ensureCollectorDaemonSet(ctx); err != nil {
return reconcile.Result{}, fmt.Errorf("ensure collector daemon set: %w", err)
}
if err := r.ensureCollectorConfig(ctx, &config.Collection, config.Features.Config.Compression, config.Exports); err != nil {
return reconcile.Result{}, fmt.Errorf("ensure collector config: %w", err)
}
return reconcile.Result{}, nil
}
func (r *collectionReconciler) ensureCollectorSecrets(ctx context.Context, spec *monitoringv1.CollectionSpec) error {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: CollectionSecretName,
Namespace: r.opts.OperatorNamespace,
Labels: map[string]string{
LabelAppName: NameCollector,
},
Annotations: map[string]string{
AnnotationMetricName: componentName,
},
},
Data: make(map[string][]byte),
}
if spec.Credentials != nil {
p := pathForSelector(r.opts.PublicNamespace, &monitoringv1.SecretOrConfigMap{Secret: spec.Credentials})
b, err := getSecretKeyBytes(ctx, r.client, r.opts.PublicNamespace, spec.Credentials)
if err != nil {
return err
}
secret.Data[p] = b
}
if err := r.client.Update(ctx, secret); apierrors.IsNotFound(err) {
if err := r.client.Create(ctx, secret); err != nil {
return fmt.Errorf("create collector secrets: %w", err)
}
} else if err != nil {
return fmt.Errorf("update collector secrets: %w", err)
}
return nil
}
// ensureCollectorDaemonSet populates the collector DaemonSet with operator-provided values.
func (r *collectionReconciler) ensureCollectorDaemonSet(ctx context.Context) error {
logger, _ := logr.FromContext(ctx)
var ds appsv1.DaemonSet
err := r.client.Get(ctx, client.ObjectKey{Namespace: r.opts.OperatorNamespace, Name: NameCollector}, &ds)
// Some users deliberately not want to run the collectors. Only emit a warning but don't cause
// retries as this logic gets re-triggered anyway if the DaemonSet is created later.
if apierrors.IsNotFound(err) {
logger.Error(err, "collector DaemonSet does not exist")
return nil
}
return err
}
func gzipData(data []byte) ([]byte, error) {
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(data); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func setConfigMapData(cm *corev1.ConfigMap, c monitoringv1.CompressionType, key string, data string) error {
// Thanos config-reloader detects gzip compression automatically, so no sync with
// config-reloaders is needed when switching between these.
switch c {
case monitoringv1.CompressionGzip:
compressed, err := gzipData([]byte(data))
if err != nil {
return fmt.Errorf("gzip Prometheus config: %w", err)
}
if cm.BinaryData == nil {
cm.BinaryData = map[string][]byte{}
}
cm.BinaryData[key] = compressed
case "", monitoringv1.CompressionNone:
if cm.Data == nil {
cm.Data = map[string]string{}
}
cm.Data[key] = data
default:
return fmt.Errorf("unknown compression type: %q", c)
}
return nil
}
// ensureCollectorConfig generates the collector config and creates or updates it.
func (r *collectionReconciler) ensureCollectorConfig(ctx context.Context, spec *monitoringv1.CollectionSpec, compression monitoringv1.CompressionType, exports []monitoringv1.ExportSpec) error {
cfg, updates, err := r.makeCollectorConfig(ctx, spec, exports)
if err != nil {
return fmt.Errorf("generate Prometheus config: %w", err)
}
var credentialsFile string
if spec.Credentials != nil {
credentialsFile = path.Join(secretsDir, pathForSelector(r.opts.PublicNamespace, &monitoringv1.SecretOrConfigMap{Secret: spec.Credentials}))
}
cfg.GoogleCloud = &GoogleCloudConfig{
Export: &GoogleCloudExportConfig{
Compression: ptr.To(string(compression)),
CredentialsFile: ptr.To(credentialsFile),
Match: spec.Filter.MatchOneOf,
},
}
cfgEncoded, err := yaml.Marshal(cfg)
if err != nil {
return fmt.Errorf("marshal Prometheus config: %w", err)
}
cm := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.opts.OperatorNamespace,
Name: NameCollector,
},
}
if err := setConfigMapData(cm, compression, configFilename, string(cfgEncoded)); err != nil {
return err
}
if err := r.client.Update(ctx, cm); apierrors.IsNotFound(err) {
if err := r.client.Create(ctx, cm); err != nil {
return fmt.Errorf("create Prometheus config: %w", err)
}
} else if err != nil {
return fmt.Errorf("update Prometheus config: %w", err)
}
// Reconcile any status updates.
var errs []error
for _, update := range updates {
// Copy status in case we update both spec and status. Updating one reverts the other.
statusUpdate := update.object.DeepCopyObject().(monitoringv1.MonitoringCRD).GetMonitoringStatus()
if update.spec {
// The status will be reverted, but our generation ID will be updated.
if err := r.client.Update(ctx, update.object); err != nil {
errs = append(errs, err)
}
}
if update.status {
// Use the object with the latest generation ID.
if err := patchMonitoringStatus(ctx, r.client, update.object, statusUpdate); err != nil {
errs = append(errs, err)
}
}
}
return errors.Join(errs...)
}
type prometheusConfig struct {
promconfig.Config `yaml:",inline"`
// Secret management. Matches our fork's configuration.
SecretConfigs []secrets.SecretConfig `yaml:"kubernetes_secrets,omitempty"`
// Google Cloud configuration. Matches our fork's configuration.
GoogleCloud *GoogleCloudConfig `yaml:"google_cloud,omitempty"`
}
type update struct {
object monitoringv1.MonitoringCRD
spec bool
status bool
}
type GoogleCloudConfig struct {
Export *GoogleCloudExportConfig `yaml:"export,omitempty"`
Query *GoogleCloudQueryConfig `yaml:"query,omitempty"`
}
type GoogleCloudExportConfig struct {
Match []string `yaml:"match,omitempty"`
Compression *string `yaml:"compression,omitempty"`
CredentialsFile *string `yaml:"credentials,omitempty"`
}
// makeCollectorConfig returns the Prometheus configuration based on the scrape configurations, the
// list of objects to update and any error.
func (r *collectionReconciler) makeCollectorConfig(ctx context.Context, spec *monitoringv1.CollectionSpec, exports []monitoringv1.ExportSpec) (*prometheusConfig, []update, error) {
logger, _ := logr.FromContext(ctx)
cfg := &promconfig.Config{
GlobalConfig: promconfig.GlobalConfig{
ExternalLabels: labels.FromMap(spec.ExternalLabels),
},
}
var err error
cfg.ScrapeConfigs, err = spec.ScrapeConfigs()
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubelet scrape config: %w", err)
}
cfg.RemoteWriteConfigs, err = makeRemoteWriteConfig(exports)
if err != nil {
return nil, nil, fmt.Errorf("failed to create export config: %w", err)
}
// Generate a separate scrape job for every endpoint in every PodMonitoring.
var (
podMons monitoringv1.PodMonitoringList
clusterPodMons monitoringv1.ClusterPodMonitoringList
clusterNodeMons monitoringv1.ClusterNodeMonitoringList
)
if err := r.client.List(ctx, &podMons); err != nil {
return nil, nil, fmt.Errorf("failed to list PodMonitorings: %w", err)
}
usedSecrets := monitoringv1.PrometheusSecretConfigs{}
projectID, location, cluster := resolveLabels(r.opts.ProjectID, r.opts.Location, r.opts.Cluster, spec.ExternalLabels)
var updates []update
// Mark status updates in batch with single timestamp.
for _, pmon := range podMons.Items {
cond := &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
}
cfgs, err := pmon.ScrapeConfigs(projectID, location, cluster, usedSecrets)
if err != nil {
msg := "generating scrape config failed for PodMonitoring endpoint"
cond = &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionFalse,
Reason: "ScrapeConfigError",
Message: msg,
}
logger.Error(err, msg, "namespace", pmon.Namespace, "name", pmon.Name)
} else {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, cfgs...)
}
updateStatus := pmon.Status.SetMonitoringCondition(pmon.GetGeneration(), metav1.Now(), cond)
if updateStatus {
updates = append(updates, update{
object: &pmon,
status: updateStatus,
})
}
}
if err := r.client.List(ctx, &clusterPodMons); err != nil {
return nil, nil, fmt.Errorf("failed to list ClusterPodMonitorings: %w", err)
}
// Mark status updates in batch with single timestamp.
for _, cmon := range clusterPodMons.Items {
cond := &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
}
cfgs, err := cmon.ScrapeConfigs(projectID, location, cluster, usedSecrets)
if err != nil {
msg := "generating scrape config failed for ClusterPodMonitoring endpoint"
cond = &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionFalse,
Reason: "ScrapeConfigError",
Message: msg,
}
logger.Error(err, msg, "namespace", cmon.Namespace, "name", cmon.Name)
} else {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, cfgs...)
}
updateStatus := cmon.Status.SetMonitoringCondition(cmon.GetGeneration(), metav1.Now(), cond)
if updateStatus {
updates = append(updates, update{
object: &cmon,
status: updateStatus,
})
}
}
// TODO(bwplotka): Warn about missing RBAC policies.
// https://github.com/GoogleCloudPlatform/prometheus-engine/issues/789
secretConfigs := usedSecrets.SecretConfigs()
if err := r.client.List(ctx, &clusterNodeMons); err != nil {
return nil, nil, fmt.Errorf("failed to list ClusterNodeMonitorings: %w", err)
}
// The following job names are reserved by GMP for ClusterNodeMonitoring in the
// gmp-system namespace. They will not be generated if kubeletScraping is enabled.
var (
reservedCAdvisorJobName = "gmp-kubelet-cadvisor"
reservedKubeletJobName = "gmp-kubelet-metrics"
)
// Mark status updates in batch with single timestamp.
for _, cnmon := range clusterNodeMons.Items {
if spec.KubeletScraping != nil && (cnmon.Name == reservedKubeletJobName || cnmon.Name == reservedCAdvisorJobName) {
logger.Info("ClusterNodeMonitoring job %s was not applied because OperatorConfig.collector.kubeletScraping is enabled. kubeletScraping already includes the metrics in this job.", "name", cnmon.Name)
continue
}
cond := &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
}
cfgs, err := cnmon.ScrapeConfigs(projectID, location, cluster)
if err != nil {
msg := "generating scrape config failed for ClusterNodeMonitoring endpoint"
cond = &monitoringv1.MonitoringCondition{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionFalse,
Reason: "ScrapeConfigError",
Message: msg,
}
logger.Error(err, msg, "namespace", cnmon.Namespace, "name", cnmon.Name)
} else {
cfg.ScrapeConfigs = append(cfg.ScrapeConfigs, cfgs...)
}
if cnmon.Status.SetMonitoringCondition(cnmon.GetGeneration(), metav1.Now(), cond) {
updates = append(updates, update{
object: &cnmon,
status: true,
})
}
}
// Sort to ensure reproducible configs.
sort.Slice(cfg.ScrapeConfigs, func(i, j int) bool {
return cfg.ScrapeConfigs[i].JobName < cfg.ScrapeConfigs[j].JobName
})
return &prometheusConfig{
Config: *cfg,
SecretConfigs: secretConfigs,
}, updates, nil
}
// makeRemoteWriteConfig generate the configs for the Prometheus remote_write feature.
func makeRemoteWriteConfig(exports []monitoringv1.ExportSpec) ([]*promconfig.RemoteWriteConfig, error) {
var exportConfigs []*promconfig.RemoteWriteConfig
for _, export := range exports {
url, err := url.Parse(export.URL)
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}
exportConfigs = append(exportConfigs,
&promconfig.RemoteWriteConfig{
URL: &config.URL{URL: url},
})
}
return exportConfigs, nil
}