pkg/secrets/watch.go (216 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package secrets import ( "context" "fmt" "math/rand" "sync" "time" "github.com/go-kit/log" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) // WatchSPConfig configures access to the Kubernetes API server. // TODO(TheSpiritXIII): https://github.com/GoogleCloudPlatform/prometheus-engine/issues/867 type WatchSPConfig struct { ClientConfig } func (c *WatchSPConfig) newProvider(ctx context.Context, opts ProviderOptions) (*watchProvider, error) { client, err := c.client() if err != nil { return nil, err } return newWatchProvider(ctx, opts.Logger, client), nil } type secretWatcher struct { // Add, Update and Remove are synchronous. We need to lock everything but `refCount`. mu sync.Mutex w watch.Interface s *corev1.Secret refCount uint done bool } func newWatcher(ctx context.Context, logger log.Logger, client kubernetes.Interface, config *KubernetesSecretConfig) (*secretWatcher, error) { watcher := &secretWatcher{ refCount: 1, done: false, } err := watcher.start(ctx, client, config) if err != nil { _ = logger.Log("msg", "secret watcher failed to start", "err", err, "namespace", config.Namespace, "name", config.Name) } started := err == nil go func() { if !started { if ok := watcher.tryRestart(ctx, logger, client, config); !ok { return } } for { select { case e, ok := <-watcher.w.ResultChan(): if ok { watcher.update(logger, e) continue } if ok := watcher.tryRestart(ctx, logger, client, config); !ok { return } case <-ctx.Done(): // The application shutdown, we don't care about cleaning up. watcher.close() return } } }() return watcher, nil } func (w *secretWatcher) update(logger log.Logger, e watch.Event) { w.mu.Lock() defer w.mu.Unlock() switch e.Type { case watch.Modified, watch.Added: secret := e.Object.(*corev1.Secret) w.s = secret case watch.Deleted: w.s = nil case watch.Bookmark: // Disabled explicitly when creating the watch interface. case watch.Error: //nolint:errcheck logger.Log("msg", "watch error event", "namespace", w.s.Namespace, "name", w.s.Name) } } func (w *secretWatcher) secret(config *KubernetesSecretConfig) Secret { fn := SecretFn(func(_ context.Context) (string, error) { w.mu.Lock() defer w.mu.Unlock() if w.s == nil { return "", errNotFound(config.Namespace, config.Name) } return getValue(w.s, config.Key) }) return &fn } func errNotFound(namespace, name string) error { return fmt.Errorf("secret %s/%s not found or forbidden", namespace, name) } // start creates the secret watch and returns true if the watch is running. func (w *secretWatcher) start(ctx context.Context, client kubernetes.Interface, config *KubernetesSecretConfig) error { var err error w.w, err = client.CoreV1().Secrets(config.Namespace).Watch(ctx, metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector(metav1.ObjectNameField, config.Name).String(), AllowWatchBookmarks: false, }) if err != nil { return fmt.Errorf("watch: %w", err) } // We could wait for the first watch event, but it doesn't notify us if the resource doesn't exist. w.s, err = client.CoreV1().Secrets(config.Namespace).Get(ctx, config.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) && !apierrors.IsForbidden(err) { defer w.w.Stop() return fmt.Errorf("fetch: %w", err) } return nil } // tryRestart restarts the secret watch indefinitely until it succeeds or the context is // cancelled and returns true if the watcher is still running. func (w *secretWatcher) tryRestart(ctx context.Context, logger log.Logger, client kubernetes.Interface, config *KubernetesSecretConfig) bool { // If the application shutdown, we don't care about cleanup. if ctx.Err() != nil { w.mu.Lock() defer w.mu.Lock() w.s = nil return false } // If closed unintentionally (i.e. network issues), try and restart it. for { ok, err := w.restart(ctx, client, config) if !ok { return false } // If an error occurred trying to watch, keep retrying. if err == nil { break } _ = logger.Log("msg", "unable to restart secret watcher", "err", err, "namespace", w.s.Namespace, "name", w.s.Name) } return true } // restart attempts to restart the secret watch. Returns true if the watcher is still // running, or false if the context is cancelled. func (w *secretWatcher) restart(ctx context.Context, client kubernetes.Interface, config *KubernetesSecretConfig) (bool, error) { // Check in case the channel cancelled intentionally. if w.done { w.mu.Lock() defer w.mu.Unlock() w.s = nil return false, nil } jitter() // Lock the watcher so it doesn't cancel before we restart. w.mu.Lock() defer w.mu.Unlock() // Check again in case the watcher cancelled while we were waiting for the mutex. if w.done { w.s = nil return false, nil } if err := w.start(ctx, client, config); err != nil { return false, err } return true, nil } func jitter() { // Pseudo-arbitrarily jitter the length of the most common scrape interval. // In the future, we may want an increasing jitter. jitter := time.Second * time.Duration(1+rand.Intn(30)) time.Sleep(1*time.Second + jitter) } func (w *secretWatcher) close() { w.mu.Lock() defer w.mu.Unlock() w.w.Stop() w.s = nil } type watchProvider struct { ctx context.Context client kubernetes.Interface secretKeyToWatcher map[string]*secretWatcher logger log.Logger } func newWatchProvider(ctx context.Context, logger log.Logger, client kubernetes.Interface) *watchProvider { return &watchProvider{ ctx: ctx, client: client, secretKeyToWatcher: map[string]*secretWatcher{}, logger: logger, } } // Add adds a new secret to the provider, starting a new watch if the secret is not already watched. func (p *watchProvider) Add(config *KubernetesSecretConfig) (Secret, error) { objKey := config.objectKey().String() val, ok := p.secretKeyToWatcher[objKey] if ok { val.refCount++ return val.secret(config), nil } var err error val, err = newWatcher(p.ctx, p.logger, p.client, config) if err != nil { return nil, err } p.secretKeyToWatcher[objKey] = val return val.secret(config), nil } // Update updates the secret, restarting the watch if the key changes. func (p *watchProvider) Update(configBefore, configAfter *KubernetesSecretConfig) (Secret, error) { objKeyBefore := configBefore.objectKey() objKeyAfter := configAfter.objectKey() if objKeyBefore == objKeyAfter { // If we're using the same secret with a different key, just remap your current watch. val := p.secretKeyToWatcher[objKeyAfter.String()] if val == nil { // Highly unlikely occurrence. return nil, errNotFound(configAfter.Namespace, configAfter.Name) } return val.secret(configAfter), nil } p.Remove(configBefore) return p.Add(configAfter) } // Remove removes the secret, stopping the watch if no other keys for the same secret are watched. func (p *watchProvider) Remove(config *KubernetesSecretConfig) { objKey := config.objectKey().String() val := p.secretKeyToWatcher[objKey] if val == nil { return } val.refCount-- if val.refCount > 0 { return } delete(p.secretKeyToWatcher, objKey) val.mu.Lock() defer val.mu.Unlock() val.done = true val.w.Stop() }