operator/pkg/controllers/customizationwatcher.go (126 lines of code) (raw):
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controllers
import (
"context"
"fmt"
"sync"
customizev1beta1 "github.com/GoogleCloudPlatform/k8s-config-connector/operator/pkg/apis/core/customize/v1beta1"
corekcck8s "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
"github.com/go-logr/logr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/event"
)
var (
// CustomizationCRsToWatch contains all the customization CRs to watch
CustomizationCRsToWatch = []schema.GroupVersionResource{
corekcck8s.ToGVR(customizev1beta1.ControllerResourceGroupVersionKind),
corekcck8s.ToGVR(customizev1beta1.ValidatingWebhookConfigurationCustomizationGroupVersionKind),
corekcck8s.ToGVR(customizev1beta1.MutatingWebhookConfigurationCustomizationGroupVersionKind),
corekcck8s.ToGVR(customizev1beta1.ControllerReconcilerGroupVersionKind),
}
// NamespacedCustomizationCRsToWatch contains all the namspaced customization CRs to watch
NamespacedCustomizationCRsToWatch = []schema.GroupVersionResource{
corekcck8s.ToGVR(customizev1beta1.NamespacedControllerResourceGroupVersionKind),
corekcck8s.ToGVR(customizev1beta1.NamespacedControllerReconcilerGroupVersionKind),
}
)
// CustomizationWatcher setup watches on 'triggerGVRs'. It is used by the CC / CCC operator.
//
// For ConfigConnector operator, CustomizationWatcher setups a cluster-scoped watch on the customization CRs. Any
// changes to the customization CRs raises a watch event on the ConfigConnector object, which then triggers a reconciliation.
//
// For ConfigConnectorContext operator, CustomizationWatcher setups a cluster-scoped watch, despite the fact that the
// CRs being watched are namespaced. This is to keep the number of watches low when there are large number of namespaces
// managed by Config Connector. Any changes to the namespaced customization CRs raises a watch event on the ConfigConnectorContext
// object in the same namespace.
//
// The raised watch events are sent to "events" channel, which is watched by CC / CCC operator.
type CustomizationWatcher struct {
// triggerGVRs contains all the GVRs to watch. An event on triggerGVRs will raise an event on the target.
triggerGVRs []schema.GroupVersionResource
// watchRegistered tracks the GVRs we are currently watching, avoid duplicate watches.
watchRegistered map[string]struct{}
// watchRegisteredMu protects access to watchRegistered map.
watchRegisteredMu sync.RWMutex
// lastRV caches the last reported resource version to filter out duplicated watch events.
lastRV map[string]string
// events is the channel that an event is raised and send to when CustomizationWatcher
// receives a watch event on the triggerGVRs it is watching.
events chan event.GenericEvent
dynamicClient dynamic.Interface
log logr.Logger
}
type CustomizationWatcherOptions struct {
TriggerGVRs []schema.GroupVersionResource
Log logr.Logger
}
func NewWithDynamicClient(dc dynamic.Interface, opts CustomizationWatcherOptions) *CustomizationWatcher {
return &CustomizationWatcher{
dynamicClient: dc,
triggerGVRs: opts.TriggerGVRs,
log: opts.Log.WithName("customization-watcher"),
watchRegistered: make(map[string]struct{}),
lastRV: make(map[string]string),
events: make(chan event.GenericEvent),
}
}
// Events returns a channel with events raised on target.
func (w *CustomizationWatcher) Events() chan event.GenericEvent {
return w.events
}
// EnsureWatchStarted starts watches on triggerGVRs if not already done so.
func (w *CustomizationWatcher) EnsureWatchStarted(ctx context.Context, targetNN types.NamespacedName) error {
for _, gvr := range w.triggerGVRs {
go w.startWatch(ctx, gvr, targetNN)
}
return nil
}
// startWatch starts a watch for changes to "triggerGVR", raises events on target object which is in
// the same namespace as the watch events received on "triggerGVR".
func (w *CustomizationWatcher) startWatch(ctx context.Context, triggerGVR schema.GroupVersionResource, targetNN types.NamespacedName) {
// if watch for the triggerGVR is already started / registered, skip; otherwise register the watch for the triggerGVR.
w.watchRegisteredMu.Lock()
if _, found := w.watchRegistered[triggerGVR.String()]; found {
w.watchRegisteredMu.Unlock()
return
}
w.watchRegistered[triggerGVR.String()] = struct{}{}
w.watchRegisteredMu.Unlock()
// make sure we de-register the watch for the triggerGVR when the watch is stopped.
defer func() {
w.watchRegisteredMu.Lock()
delete(w.watchRegistered, triggerGVR.String())
w.watchRegisteredMu.Unlock()
}()
log := w.log.WithValues("trigger GVR", triggerGVR.String(), "target NamespacedName", targetNN)
opts := metav1.ListOptions{AllowWatchBookmarks: true}
triggerEvents, err := w.dynamicClient.Resource(triggerGVR).Watch(ctx, opts)
if err != nil {
log.Error(err, "failed to start watch")
return
}
log.Info("watch started")
defer func() {
triggerEvents.Stop()
log.Info("watch stopped")
}()
for {
select {
case <-ctx.Done():
log.Info("watch context cancelled")
return
case triggerEvent, ok := <-triggerEvents.ResultChan():
if !ok {
log.Info("watch channel closed")
return
}
switch triggerEvent.Type {
case watch.Bookmark:
continue
case watch.Error:
log.Error(err, "unexpected event type from watch")
return
}
u, ok := triggerEvent.Object.(*unstructured.Unstructured)
if !ok {
log.Error(fmt.Errorf("expect watch event object to be *unstructure.Unstructured"), "unexpected event object from watch", "event object", triggerEvent.Object)
continue
}
lastRVKey := fmt.Sprintf("gvr=%s;namespace=%s;name=%s", triggerGVR.String(), u.GetNamespace(), u.GetName()) // a unique key given GVR, namespace and name
switch triggerEvent.Type {
case watch.Deleted:
delete(w.lastRV, lastRVKey)
case watch.Added, watch.Modified:
rv := u.GetResourceVersion()
if previousRV, found := w.lastRV[lastRVKey]; found && previousRV == rv {
// Filter out duplicated watch events
continue
}
w.lastRV[lastRVKey] = rv
}
genEvent := event.GenericEvent{}
genEvent.Object = &unstructured.Unstructured{}
genEvent.Object.SetNamespace(u.GetNamespace()) // raise event in the same namespace as the trigger watch event
genEvent.Object.SetName(targetNN.Name) // raise event on the target name
w.events <- genEvent
}
}
}