pkg/operator/webhook.go (198 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package operator contains the Prometheus
package operator
import (
"context"
"encoding/base64"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1"
"github.com/go-logr/logr"
arv1 "k8s.io/api/admissionregistration/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/cert"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)
// setupAdmissionWebhooks configures validating webhooks for the operator-managed
// custom resources and registers handlers with the webhook server.
func setupAdmissionWebhooks(ctx context.Context, logger logr.Logger, kubeClient client.Client, webhookServer *webhook.DefaultServer, opts *Options, vpaAvailable bool) error {
// Write provided cert files.
caBundle, err := ensureCerts(opts.OperatorNamespace, webhookServer.Options.CertDir, opts.TLSCert, opts.TLSKey, opts.CACert)
if err != nil {
return err
}
name := webhookName(opts.OperatorNamespace)
if len(caBundle) > 0 {
// Keep setting the caBundle, if "ensureCerts" gives us those, in the expected webhook configurations.
// In case of not enough permissions we will keep trying with error message.
go continuouslySetCABundle(ctx, logger, kubeClient, name, caBundle)
}
scheme := kubeClient.Scheme()
// Validating webhooks.
webhookServer.Register(
validatePath(monitoringv1.OperatorConfigResource()),
admission.WithCustomValidator(scheme, &monitoringv1.OperatorConfig{}, &monitoringv1.OperatorConfigValidator{
Namespace: opts.PublicNamespace,
Name: NameOperatorConfig,
VPAAvailable: vpaAvailable,
}),
)
webhookServer.Register(
validatePath(monitoringv1.RulesResource()),
admission.ValidatingWebhookFor(scheme, &monitoringv1.Rules{}),
)
webhookServer.Register(
validatePath(monitoringv1.ClusterRulesResource()),
admission.ValidatingWebhookFor(scheme, &monitoringv1.ClusterRules{}),
)
webhookServer.Register(
validatePath(monitoringv1.GlobalRulesResource()),
admission.ValidatingWebhookFor(scheme, &monitoringv1.GlobalRules{}),
)
// Defaulting webhooks.
webhookServer.Register(
defaultPath(monitoringv1.OperatorConfigResource()),
admission.WithCustomDefaulter(scheme, &monitoringv1.OperatorConfig{}, &operatorConfigDefaulter{
projectID: opts.ProjectID,
location: opts.Location,
cluster: opts.Cluster,
}),
)
return nil
}
func webhookName(namespace string) string {
return fmt.Sprintf("%s.%s.monitoring.googleapis.com", NameOperator, namespace)
}
// ensureCerts writes the cert/key files to the specified directory.
// If cert/key are not available, generate them.
func ensureCerts(operatorNamespace, dir, certEncoded, keyEncoded, caCertEncoded string) ([]byte, error) {
var (
crt, key, caData []byte
err error
)
if keyEncoded != "" && certEncoded != "" {
crt, err = base64.StdEncoding.DecodeString(certEncoded)
if err != nil {
return nil, fmt.Errorf("decoding TLS certificate: %w", err)
}
key, err = base64.StdEncoding.DecodeString(keyEncoded)
if err != nil {
return nil, fmt.Errorf("decoding TLS key: %w", err)
}
if caCertEncoded != "" {
caData, err = base64.StdEncoding.DecodeString(caCertEncoded)
if err != nil {
return nil, fmt.Errorf("decoding certificate authority: %w", err)
}
}
} else if keyEncoded == "" && certEncoded == "" && caCertEncoded == "" {
// Generate a self-signed pair if none was explicitly provided. It will be valid
// for 1 year.
// TODO(freinartz): re-generate at runtime and update the ValidatingWebhookConfiguration
// at runtime whenever the files change.
fqdn := fmt.Sprintf("%s.%s.svc", NameOperator, operatorNamespace)
crt, key, err = cert.GenerateSelfSignedCertKey(fqdn, nil, nil)
if err != nil {
return nil, fmt.Errorf("generate self-signed TLS key pair: %w", err)
}
// Use crt as the ca in the self-sign case.
caData = crt
} else {
return nil, errors.New("flags key-base64 and cert-base64 must both be set")
}
// Create cert/key files.
if err := os.WriteFile(filepath.Join(dir, "tls.crt"), crt, 0666); err != nil {
return nil, fmt.Errorf("create cert file: %w", err)
}
if err := os.WriteFile(filepath.Join(dir, "tls.key"), key, 0666); err != nil {
return nil, fmt.Errorf("create key file: %w", err)
}
return caData, nil
}
func validatePath(gvr metav1.GroupVersionResource) string {
return fmt.Sprintf("/validate/%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
}
func defaultPath(gvr metav1.GroupVersionResource) string {
return fmt.Sprintf("/default/%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
}
func setValidatingWebhookCABundle(ctx context.Context, kubeClient client.Client, name string, caBundle []byte) error {
var vwc arv1.ValidatingWebhookConfiguration
err := kubeClient.Get(ctx, client.ObjectKey{Name: name}, &vwc)
if apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
for i := range vwc.Webhooks {
vwc.Webhooks[i].ClientConfig.CABundle = caBundle
}
return kubeClient.Update(ctx, &vwc)
}
func setMutatingWebhookCABundle(ctx context.Context, kubeClient client.Client, name string, caBundle []byte) error {
var mwc arv1.MutatingWebhookConfiguration
err := kubeClient.Get(ctx, client.ObjectKey{Name: name}, &mwc)
if apierrors.IsNotFound(err) {
return nil
} else if err != nil {
return err
}
for i := range mwc.Webhooks {
mwc.Webhooks[i].ClientConfig.CABundle = caBundle
}
return kubeClient.Update(ctx, &mwc)
}
func continuouslySetCABundle(ctx context.Context, logger logr.Logger, kubeClient client.Client, name string, caBundle []byte) {
// Initial sleep for the client to initialize before our first calls.
// Ideally we could explicitly wait for it.
time.Sleep(5 * time.Second)
for {
if err := setValidatingWebhookCABundle(ctx, kubeClient, name, caBundle); err != nil {
logger.Error(err, "Setting CA bundle for ValidatingWebhookConfiguration failed; retrying in 1m...")
}
if err := setMutatingWebhookCABundle(ctx, kubeClient, name, caBundle); err != nil {
logger.Error(err, "Setting CA bundle for MutatingWebhookConfiguration failed; retrying in 1m...")
}
select {
case <-ctx.Done():
return
case <-time.After(time.Minute):
}
}
}
type operatorConfigDefaulter struct {
projectID string
location string
cluster string
}
func (d *operatorConfigDefaulter) Default(_ context.Context, o runtime.Object) error {
oc := o.(*monitoringv1.OperatorConfig)
_ = d.update(oc)
return nil
}
// update defaults the OperatorConfig, returning true if the OperatorConfig was updated.
func (d *operatorConfigDefaulter) update(oc *monitoringv1.OperatorConfig) bool {
updated := false
// Upsert projectID, location, and cluster to external labels.
// If not present in external labels, use the values passed to the operator.
// If present in external labels, this is effectively a no-op.
// Do this for both collection and rule-evaluator configuration.
var projectID, location, cluster = resolveLabels(d.projectID, d.location, d.cluster, oc.Collection.ExternalLabels)
collectionExpected := map[string]string{
export.KeyProjectID: projectID,
export.KeyLocation: location,
export.KeyCluster: cluster,
}
if oc.Collection.ExternalLabels == nil {
oc.Collection.ExternalLabels = collectionExpected
updated = true
} else {
for key, val := range collectionExpected {
if oc.Collection.ExternalLabels[key] != val {
oc.Collection.ExternalLabels[key] = val
updated = true
}
}
}
projectID, location, cluster = resolveLabels(d.projectID, d.location, d.cluster, oc.Rules.ExternalLabels)
rulesExpected := map[string]string{
export.KeyProjectID: projectID,
export.KeyLocation: location,
export.KeyCluster: cluster,
}
if oc.Rules.ExternalLabels == nil {
oc.Rules.ExternalLabels = rulesExpected
updated = true
} else {
for key, val := range rulesExpected {
if oc.Rules.ExternalLabels[key] != val {
oc.Rules.ExternalLabels[key] = val
updated = true
}
}
}
return updated
}