internal/controller/authproxyworkload_controller.go (341 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
cloudsqlapi "github.com/GoogleCloudPlatform/cloud-sql-proxy-operator/internal/api/v1"
"github.com/GoogleCloudPlatform/cloud-sql-proxy-operator/internal/workload"
"github.com/go-logr/logr"
)
const finalizerName = cloudsqlapi.AnnotationPrefix + "/AuthProxyWorkload-finalizer"
var (
requeueNow = ctrl.Result{Requeue: true}
requeueWithDelay = ctrl.Result{Requeue: true, RequeueAfter: 30 * time.Second}
)
type recentlyDeletedCache struct {
lock sync.RWMutex
values map[types.NamespacedName]bool
}
func (c *recentlyDeletedCache) set(k types.NamespacedName, deleted bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.values == nil {
c.values = map[types.NamespacedName]bool{}
}
c.values[k] = deleted
}
func (c *recentlyDeletedCache) get(k types.NamespacedName) bool {
c.lock.RLock()
defer c.lock.RUnlock()
deleted, ok := c.values[k]
if !ok {
return false
}
return deleted
}
// AuthProxyWorkloadReconciler reconciles a AuthProxyWorkload object
type AuthProxyWorkloadReconciler struct {
client.Client
Scheme *runtime.Scheme
recentlyDeleted *recentlyDeletedCache
updater *workload.Updater
}
// NewAuthProxyWorkloadManager constructs an AuthProxyWorkloadReconciler
func NewAuthProxyWorkloadReconciler(mgr ctrl.Manager, u *workload.Updater) (*AuthProxyWorkloadReconciler, error) {
r := &AuthProxyWorkloadReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
recentlyDeleted: &recentlyDeletedCache{},
updater: u,
}
err := r.SetupWithManager(mgr)
return r, err
}
// SetupWithManager adds this AuthProxyWorkload controller to the controller-runtime
// manager.
func (r *AuthProxyWorkloadReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := true
return ctrl.NewControllerManagedBy(mgr).
For(&cloudsqlapi.AuthProxyWorkload{}).
WithOptions(controller.Options{SkipNameValidation: &b}).
Complete(r)
}
//+kubebuilder:rbac:groups=apps,resources=deployments;statefulsets;daemonsets;replicasets,verbs=update;patch
//+kubebuilder:rbac:groups=apps,resources=*,verbs=get;list;watch
//+kubebuilder:rbac:groups=batch,resources=*,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=*,verbs=get;list;watch
//+kubebuilder:rbac:groups=cloudsql.cloud.google.com,resources=authproxyworkloads,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=cloudsql.cloud.google.com,resources=authproxyworkloads/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=cloudsql.cloud.google.com,resources=authproxyworkloads/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch;create;update;patch;delete
// Reconcile updates the state of the cluster so that AuthProxyWorkload instances
// have their configuration reflected correctly on workload PodSpec configuration.
// This reconcile loop runs when an AuthProxyWorkload is added, modified or deleted.
// It updates annotations on matching workloads indicating those workload that
// need to be updated.
//
// As this controller's Reconcile() function patches the annotations on workloads,
// the PodAdmissionWebhook.Handle() method is called by k8s api, which is
// where the PodSpec is modified to match the AuthProxyWorkload configuration.
//
// This function can only make one update to the AuthProxyWorkload per loop, so it
// is written like a state machine. It will quickly do a single update, often to
// the status, and then return. Sometimes it instructs the controller runtime to quickly
// requeue another call to Reconcile, so that it can further process the
// AuthProxyWorkload. It often takes several calls to Reconcile() to finish the
// reconcilliation of a single change to an AuthProxyWorkload.
//
// For more details, check Reconcile and its Result here:
// https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile
func (r *AuthProxyWorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
l := log.FromContext(ctx)
var err error
resource := &cloudsqlapi.AuthProxyWorkload{}
l.Info("Reconcile loop started AuthProxyWorkload", "name", req.NamespacedName)
if err = r.Get(ctx, req.NamespacedName, resource); err != nil {
// The resource can't be loaded.
// If it was recently deleted, then ignore the error and don't requeue.
if r.recentlyDeleted.get(req.NamespacedName) {
return ctrl.Result{}, nil
}
// otherwise, report the error and requeue. This is likely caused by a delay
// in reaching consistency in the eventually-consistent kubernetes API.
l.Error(err, "unable to fetch resource")
return requeueWithDelay, err
}
// If this was deleted, doDelete()
// DeletionTimestamp metadata field is set by k8s when a resource
// has been deleted but the finalizers are still present. We check that this
// value is not zero To determine when a resource is deleted and waiting for
// completion of finalizers.
if !resource.ObjectMeta.DeletionTimestamp.IsZero() {
l.Info("Reconcile delete for AuthProxyWorkload",
"name", resource.GetName(),
"namespace", resource.GetNamespace(),
"gen", resource.GetGeneration())
r.recentlyDeleted.set(req.NamespacedName, true)
// the object has been deleted
return r.doDelete(ctx, resource)
}
l.Info("Reconcile add/update for AuthProxyWorkload",
"name", resource.GetName(),
"namespace", resource.GetNamespace(),
"gen", resource.GetGeneration())
r.recentlyDeleted.set(req.NamespacedName, false)
return r.doCreateUpdate(ctx, l, resource)
}
// doDelete removes our finalizer and updates the related workloads
// when the reconcile loop receives an AuthProxyWorkload that was deleted.
func (r *AuthProxyWorkloadReconciler) doDelete(ctx context.Context, resource *cloudsqlapi.AuthProxyWorkload) (ctrl.Result, error) {
// Mark all related workloads as needing to be updated
allWorkloads, err := r.updateWorkloadStatus(ctx, resource)
if err != nil {
return requeueNow, err
}
_, err = r.updateWorkloadAnnotations(ctx, resource, allWorkloads)
if err != nil {
return requeueNow, err
}
// Remove the finalizer so that the object can be fully deleted
if controllerutil.ContainsFinalizer(resource, finalizerName) {
controllerutil.RemoveFinalizer(resource, finalizerName)
err = r.Update(ctx, resource)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// doCreateUpdate reconciles an AuthProxyWorkload resource that has been created
// or updated, making sure that related workloads get updated.
//
// This is implemented as a state machine. The current state is determined using
// - the absence or presence of this controller's finalizer
// - the success or error when retrieving workloads related to this resource
// - the number of workloads needing updates
// - the condition `UpToDate` status and reason
//
// States:
// | state | finalizer| fetch err | len(wl) | outOfDateCount | Name |
// |---------|----------|-----------|---------|----------------|---------------------------------------|
// | 0 | * | * | * | | start |
// | 1.1 | absent | * | * | | needs finalizer |
// | 1.2 | present | error | * | | can't list workloads |
// | 2.1 | present | nil | == 0 | | no workloads to reconcile |
// | 3.1 | present | nil | > 0 | > 0 , err | workload update needed, and failed |
// | 3.2 | present | nil | > 0 | > 0 | workload update needed, and succeeded |
// | 3.3 | present | nil | > 0 | == 0 | workloads reconciled |
//
// start ----x
// |---> 1.1 --> (requeue, goto start)
// |---> 1.2 --> (requeue, goto start)
// |---> 2.1 --> (end)
// |
// |---> 3.1 ---> (requeue, goto start)
// |---> 3.2 ---> (requeue, goto start)
// |---> 3.3 ---> (end)
func (r *AuthProxyWorkloadReconciler) doCreateUpdate(ctx context.Context, l logr.Logger, resource *cloudsqlapi.AuthProxyWorkload) (ctrl.Result, error) {
orig := resource.DeepCopy()
var err error
// State 0: The reconcile loop for a single AuthProxyWorkload resource begins
// when an AuthProxyWorkload resource is created, modified, or deleted in the k8s api
// or when that AuthProxyWorkload resource is requeued for another reconcile loop.
if !controllerutil.ContainsFinalizer(resource, finalizerName) {
// State 1.1: This is a brand new thing that doesn't have a finalizer.
// Add the finalizer and requeue for another run through the reconcile loop
return r.applyFinalizer(ctx, l, resource)
}
// find all workloads that relate to this AuthProxyWorkload resource
allWorkloads, err := r.updateWorkloadStatus(ctx, resource)
if err != nil {
// State 1.2 - unable to read workloads, abort and try again after a delay.
return requeueWithDelay, err
}
// State 2: If workload reconcile has not yet started, then start it.
// State 2.1: When there are no workloads, then mark this as "UpToDate" true,
// do not requeue.
if len(allWorkloads) == 0 {
return r.reconcileResult(ctx, l, resource, orig, cloudsqlapi.ReasonNoWorkloadsFound, "No workload updates needed", true)
}
// State 3.*: Workloads already exist. Some may need to be updated to roll out
// changes.
outOfDateCount, err := r.updateWorkloadAnnotations(ctx, resource, allWorkloads)
if err != nil {
return requeueNow, err
}
// State 3.2 Successfully updated all workload PodTemplateSpec annotations, requeue
if outOfDateCount > 0 {
message := fmt.Sprintf("Reconciled %d matching workloads. %d workloads need updates", len(allWorkloads), outOfDateCount)
return r.reconcileResult(ctx, l, resource, orig, cloudsqlapi.ReasonWorkloadNeedsUpdate, message, false)
}
// State 3.3 Workload PodTemplateSpec annotations are all up to date
message := fmt.Sprintf("Reconciled %d matching workloads complete", len(allWorkloads))
return r.reconcileResult(ctx, l, resource, orig, cloudsqlapi.ReasonFinishedReconcile, message, true)
}
// needsAnnotationUpdate returns true when the workload was annotated with
// a different generation of the resource.
func (r *AuthProxyWorkloadReconciler) needsAnnotationUpdate(wl workload.Workload, resource *cloudsqlapi.AuthProxyWorkload) bool {
// This workload is not mutable. Ignore it.
if _, ok := wl.(workload.WithMutablePodTemplate); !ok {
return false
}
if isRolloutStrategyNone(resource) {
return false
}
k, v := r.updater.PodAnnotation(resource)
// Check if the correct annotation exists
an := wl.PodTemplateAnnotations()
if an != nil && an[k] == v {
return false
}
return true
}
// updateAnnotation applies an annotation to the workload for the resource.
func (r *AuthProxyWorkloadReconciler) updateAnnotation(wl workload.Workload, resource *cloudsqlapi.AuthProxyWorkload) {
mpt, ok := wl.(workload.WithMutablePodTemplate)
// This workload is not mutable. Ignore it.
if !ok {
return
}
// The user has set "None" as the rollout strategy. Ignore it.
if isRolloutStrategyNone(resource) {
return
}
k, v := r.updater.PodAnnotation(resource)
// add the annotation if needed...
an := wl.PodTemplateAnnotations()
if an == nil {
an = make(map[string]string)
}
an[k] = v
mpt.SetPodTemplateAnnotations(an)
}
// isRolloutStrategyNone returns true when user has set "None" as the rollout strategy.
func isRolloutStrategyNone(resource *cloudsqlapi.AuthProxyWorkload) bool {
return resource.Spec.AuthProxyContainer != nil &&
resource.Spec.AuthProxyContainer.RolloutStrategy == cloudsqlapi.NoneStrategy
}
// workloadsReconciled State 3.1: If workloads are all up to date, mark the condition
// "UpToDate" true and do not requeue.
func (r *AuthProxyWorkloadReconciler) reconcileResult(ctx context.Context, l logr.Logger, resource, orig *cloudsqlapi.AuthProxyWorkload, reason, message string, upToDate bool) (ctrl.Result, error) {
status := metav1.ConditionFalse
result := requeueNow
if upToDate {
status = metav1.ConditionTrue
result = ctrl.Result{}
}
// Workload updates are complete, update the status
resource.Status.Conditions = replaceCondition(resource.Status.Conditions, &metav1.Condition{
Type: cloudsqlapi.ConditionUpToDate,
Status: status,
ObservedGeneration: resource.GetGeneration(),
Reason: reason,
Message: message,
})
err := r.patchAuthProxyWorkloadStatus(ctx, resource, orig)
if err != nil {
l.Error(err, "Unable to patch status before beginning workloads", "AuthProxyWorkload", resource.GetNamespace()+"/"+resource.GetName())
return result, err
}
return result, nil
}
// applyFinalizer adds the finalizer so that the operator is notified when
// this AuthProxyWorkload resource gets deleted. applyFinalizer is called only
// once, when the resource first added.
func (r *AuthProxyWorkloadReconciler) applyFinalizer(
ctx context.Context, l logr.Logger, resource *cloudsqlapi.AuthProxyWorkload) (ctrl.Result, error) {
// The AuthProxyWorkload resource needs a finalizer, so add
// the finalizer, exit the reconcile loop and requeue.
controllerutil.AddFinalizer(resource, finalizerName)
err := r.Update(ctx, resource)
if err != nil {
l.Info("Error adding finalizer. Will requeue for reconcile.", "err", err)
return requeueNow, err
}
l.Info("Added finalizer. Will requeue quickly for reconcile", "err", err)
return requeueNow, err
}
// patchAuthProxyWorkloadStatus uses the PATCH method to incrementally update
// the AuthProxyWorkload.Status field.
func (r *AuthProxyWorkloadReconciler) patchAuthProxyWorkloadStatus(
ctx context.Context, resource *cloudsqlapi.AuthProxyWorkload, orig *cloudsqlapi.AuthProxyWorkload) error {
err := r.Client.Status().Patch(ctx, resource, client.MergeFrom(orig))
if err != nil {
return err
}
err = r.Get(ctx, types.NamespacedName{
Namespace: resource.GetNamespace(),
Name: resource.GetName(),
}, orig)
return err
}
// updateWorkloadStatus lists all workloads related to a cloudsql instance and
// updates the needs update annotations using internal.UpdateWorkloadAnnotation.
// Once the workload is saved, the workload admission mutate webhook will
// apply the correct containers to this instance.
func (r *AuthProxyWorkloadReconciler) updateWorkloadStatus(ctx context.Context, resource *cloudsqlapi.AuthProxyWorkload) (matching []workload.Workload, retErr error) {
matching, err := r.listWorkloads(ctx, resource.Spec.Workload, resource.GetNamespace())
if err != nil {
return nil, err
}
for _, wl := range matching {
// update the status condition for a workload
s := newStatus(wl)
s.Conditions = replaceCondition(s.Conditions, &metav1.Condition{
Type: cloudsqlapi.ConditionWorkloadUpToDate,
Status: metav1.ConditionTrue,
ObservedGeneration: resource.GetGeneration(),
Reason: cloudsqlapi.ReasonUpToDate,
Message: "No update needed for this workload",
})
resource.Status.WorkloadStatus = replaceStatus(resource.Status.WorkloadStatus, s)
}
return matching, nil
}
// replaceStatus replace a status with the same name, namespace, kind, and version,
// or appends updatedStatus to statuses
func replaceStatus(statuses []*cloudsqlapi.WorkloadStatus, updatedStatus *cloudsqlapi.WorkloadStatus) []*cloudsqlapi.WorkloadStatus {
var updated bool
for i := range statuses {
s := statuses[i]
if s.Name == updatedStatus.Name &&
s.Namespace == updatedStatus.Namespace &&
s.Kind == updatedStatus.Kind &&
s.Version == updatedStatus.Version {
statuses[i] = updatedStatus
updated = true
}
}
if !updated {
statuses = append(statuses, updatedStatus)
}
return statuses
}
func findCondition(conds []*metav1.Condition, name string) *metav1.Condition {
for i := range conds {
if conds[i].Type == name {
return conds[i]
}
}
return nil
}
// replaceCondition replace a status with the same name, namespace, kind, and version,
// or appends updatedStatus to statuses
func replaceCondition(conds []*metav1.Condition, newC *metav1.Condition) []*metav1.Condition {
for i := range conds {
c := conds[i]
if c.Type != newC.Type {
continue
}
if conds[i].Status == newC.Status && !conds[i].LastTransitionTime.IsZero() {
newC.LastTransitionTime = conds[i].LastTransitionTime
} else {
newC.LastTransitionTime = metav1.NewTime(time.Now())
}
conds[i] = newC
return conds
}
newC.LastTransitionTime = metav1.NewTime(time.Now())
conds = append(conds, newC)
return conds
}
// newStatus creates a WorkloadStatus from a workload with identifying
// fields filled in.
func newStatus(wl workload.Workload) *cloudsqlapi.WorkloadStatus {
return &cloudsqlapi.WorkloadStatus{
Kind: wl.Object().GetObjectKind().GroupVersionKind().Kind,
Version: wl.Object().GetObjectKind().GroupVersionKind().GroupVersion().Identifier(),
Namespace: wl.Object().GetNamespace(),
Name: wl.Object().GetName(),
}
}
// listWorkloads produces a list of Workload's that match the WorkloadSelectorSpec
// in the specified namespace.
func (r *AuthProxyWorkloadReconciler) listWorkloads(ctx context.Context, workloadSelector cloudsqlapi.WorkloadSelectorSpec, ns string) ([]workload.Workload, error) {
if workloadSelector.Name != "" {
return r.loadByName(ctx, workloadSelector, ns)
}
return r.loadByLabelSelector(ctx, workloadSelector, ns)
}
// loadByName loads a single workload by name.
func (r *AuthProxyWorkloadReconciler) loadByName(ctx context.Context, workloadSelector cloudsqlapi.WorkloadSelectorSpec, ns string) ([]workload.Workload, error) {
var wl workload.Workload
key := client.ObjectKey{Namespace: ns, Name: workloadSelector.Name}
wl, err := workload.WorkloadForKind(workloadSelector.Kind)
if err != nil {
return nil, fmt.Errorf("unable to load by name %s/%s: %v", key.Namespace, key.Name, err)
}
err = r.Get(ctx, key, wl.Object())
if err != nil {
if errors.IsNotFound(err) {
return nil, nil // empty list when no named workload is found. It is not an error.
}
return nil, fmt.Errorf("unable to load resource by name %s/%s: %v", key.Namespace, key.Name, err)
}
return []workload.Workload{wl}, nil
}
// loadByLabelSelector loads workloads matching a label selector
func (r *AuthProxyWorkloadReconciler) loadByLabelSelector(ctx context.Context, workloadSelector cloudsqlapi.WorkloadSelectorSpec, ns string) ([]workload.Workload, error) {
l := log.FromContext(ctx)
sel, err := workloadSelector.LabelsSelector()
if err != nil {
return nil, err
}
_, gk := schema.ParseKindArg(workloadSelector.Kind)
wl, err := workload.WorkloadListForKind(gk.Kind)
if err != nil {
return nil, err
}
err = r.List(ctx, wl.List(), client.InNamespace(ns), client.MatchingLabelsSelector{Selector: sel})
if err != nil {
l.Error(err, "Unable to list s for workloadSelector", "selector", sel)
return nil, err
}
return wl.Workloads(), nil
}
func (r *AuthProxyWorkloadReconciler) updateWorkloadAnnotations(ctx context.Context, resource *cloudsqlapi.AuthProxyWorkload, workloads []workload.Workload) (int, error) {
var outOfDate int
for _, wl := range workloads {
if r.needsAnnotationUpdate(wl, resource) {
outOfDate++
_, err := controllerutil.CreateOrPatch(ctx, r.Client, wl.Object(), func() error {
r.updateAnnotation(wl, resource)
return nil
})
// Failed to update one of the workloads PodTemplateSpec annotations.
if err != nil {
return 0, fmt.Errorf("reconciled %d matching workloads. Error removing proxy from workload %v: %v", len(workloads), wl.Object().GetName(), err)
}
}
}
return outOfDate, nil
}