internal/controller/pod_controller.go (178 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package controller
import (
"context"
"fmt"
"net/http"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/json"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
cloudsqlapi "github.com/GoogleCloudPlatform/cloud-sql-proxy-operator/internal/api/v1"
"github.com/GoogleCloudPlatform/cloud-sql-proxy-operator/internal/workload"
)
// PodAdmissionWebhook implementation of a controller-runtime webhook for all
// supported workload types: Deployment, ReplicaSet, StatefulSet, Pod, CronJob, Job
type PodAdmissionWebhook struct {
Client client.Client
decoder admission.Decoder
updater *workload.Updater
}
// Handle is the MutatingWebhookController implemnentation which will update
// the proxy sidecars on all workloads to match the AuthProxyWorkload config.
func (a *PodAdmissionWebhook) Handle(ctx context.Context, req admission.Request) admission.Response {
l := logf.FromContext(ctx)
p := corev1.Pod{}
err := a.decoder.Decode(req, &p)
if err != nil {
l.Info("/mutate-pod request can't be processed",
"kind", req.Kind.Kind, "ns", req.Namespace, "name", req.Name)
return admission.Errored(http.StatusInternalServerError, err)
}
updatedPod, err := a.handleCreatePodRequest(ctx, p)
if err != nil {
return admission.Errored(http.StatusInternalServerError, err)
}
if updatedPod == nil {
return admission.Allowed("no changes to pod")
}
// Marshal the updated Pod and prepare to send a response
marshaledRes, err := json.Marshal(updatedPod)
if err != nil {
l.Error(err, "Unable to marshal workload result in webhook",
"kind", req.Kind.Kind, "ns", req.Namespace, "name", req.Name)
return admission.Errored(http.StatusInternalServerError,
fmt.Errorf("unable to marshal workload result"))
}
l.Info("updated proxy on pod", "Operation", req.Operation, "Namespace", req.Namespace, "Name", req.Name)
return admission.PatchResponseFromRaw(req.Object.Raw, marshaledRes)
}
// handleCreatePodRequest Finds relevant AuthProxyWorkload resources and updates the pod
// with matching resources, returning a non-nil pod when the pod was updated.
func (a *PodAdmissionWebhook) handleCreatePodRequest(ctx context.Context, p corev1.Pod) (*corev1.Pod, error) {
l := logf.FromContext(ctx)
wl := &workload.PodWorkload{Pod: &p}
proxies, err := findMatchingProxies(ctx, a.Client, a.updater, wl)
if err != nil {
return nil, err
}
if len(proxies) == 0 {
return nil, nil
}
// Configure the pod, adding containers for each of the proxies
wlConfigErr := a.updater.ConfigureWorkload(wl, proxies)
if wlConfigErr != nil {
l.Error(wlConfigErr, "Unable to reconcile workload result in webhook: "+wlConfigErr.Error(),
"kind", wl.Pod.Kind, "ns", wl.Pod.Namespace, "name", wl.Pod.Name)
return nil, fmt.Errorf("there is an AuthProxyWorkloadConfiguration error reconciling this workload %v", wlConfigErr)
}
return wl.Pod, nil // updated pod
}
// findMatchingProxies lists all AuthProxyWorkloads that are related to this pod
// or its owners.
func findMatchingProxies(ctx context.Context, c client.Client, u *workload.Updater, wl *workload.PodWorkload) ([]*cloudsqlapi.AuthProxyWorkload, error) {
var (
instList = &cloudsqlapi.AuthProxyWorkloadList{}
proxies []*cloudsqlapi.AuthProxyWorkload
l = logf.FromContext(ctx)
)
// List all the AuthProxyWorkloads in the same namespace.
// To avoid privilege escalation, the operator requires that the AuthProxyWorkload
// may only affect pods in the same namespace.
err := c.List(ctx, instList, client.InNamespace(wl.Object().GetNamespace()))
if err != nil {
l.Error(err, "Unable to list CloudSqlClient resources in webhook",
"kind", wl.Pod.Kind, "ns", wl.Pod.Namespace, "name", wl.Pod.Name)
return nil, fmt.Errorf("unable to list AuthProxyWorkloads, %v", err)
}
// List the owners of this pod.
owners, err := listOwners(ctx, c, wl.Object())
if err != nil {
return nil, fmt.Errorf("there is an AuthProxyWorkloadConfiguration error reconciling this workload %v", err)
}
// Find matching AuthProxyWorkloads for this pod
proxies = u.FindMatchingAuthProxyWorkloads(instList, wl, owners)
if len(proxies) == 0 {
return nil, nil // no change
}
return proxies, nil
}
// listOwners returns the list of this object's owners and its extended owners.
// Warning: this is a recursive function
func listOwners(ctx context.Context, c client.Client, object client.Object) ([]workload.Workload, error) {
l := logf.FromContext(ctx)
var owners []workload.Workload
for _, r := range object.GetOwnerReferences() {
key := client.ObjectKey{Namespace: object.GetNamespace(), Name: r.Name}
var owner client.Object
wl, err := workload.WorkloadForKind(r.Kind)
if err != nil {
// If the operator doesn't recognize the owner's Kind, then ignore
// that owner.
continue
}
owners = append(owners, wl)
owner = wl.Object()
err = c.Get(ctx, key, owner)
if err != nil {
switch t := err.(type) {
case *apierrors.StatusError:
// Ignore when the owner is not found. Sometimes owners no longer exist.
if t.ErrStatus.Reason == metav1.StatusReasonNotFound {
continue
}
}
l.Info("could not get owner ", "owner", r.String(), "err", err)
return nil, err
}
// Recursively call for the owners of the owner, and append those.
// So that we reach Pod --> ReplicaSet --> Deployment
ownerOwners, err := listOwners(ctx, c, owner)
if err != nil {
return nil, err
}
owners = append(owners, ownerOwners...)
}
return owners, nil
}
type podDeleteController struct {
client.Client
Scheme *runtime.Scheme
updater *workload.Updater
}
// newDeletePodController constructs a podDeleteController.
func newPodDeleteController(mgr ctrl.Manager, u *workload.Updater) (*podDeleteController, error) {
r := &podDeleteController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
updater: u,
}
err := r.setupWithManager(mgr)
return r, err
}
// setupWithManager adds this AuthProxyWorkload controller to the controller-runtime
// manager.
func (r *podDeleteController) setupWithManager(mgr ctrl.Manager) error {
b := true
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
WithOptions(controller.Options{SkipNameValidation: &b}).
Complete(r)
}
func (r *podDeleteController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// Read the ReplicaSet
pod := &corev1.Pod{}
err := r.Client.Get(ctx, req.NamespacedName, pod)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// Pod is already deleted, ignore this change event.
if !pod.ObjectMeta.DeletionTimestamp.IsZero() {
return reconcile.Result{}, nil
}
err = r.handlePodChanged(ctx, pod)
if err != nil {
return reconcile.Result{Requeue: true}, err
}
return reconcile.Result{}, nil
}
// handlePodChanged Deletes pods that meet the following criteria:
// 1. The pod is in Error or CrashLoopBackOff state.
// 2. The pod matches one or more AuthProxyWorkload resources.
// 3. The pod is missing one or more proxy sidecar containers for the resources.
func (r *podDeleteController) handlePodChanged(ctx context.Context, pod *corev1.Pod) error {
wl := &workload.PodWorkload{Pod: pod}
// Find all proxies that match this pod
proxies, err := findMatchingProxies(ctx, r.Client, r.updater, wl)
if err != nil {
return fmt.Errorf("unable to find proxies when pod changed, %v", err)
}
// There are no proxies, ignore this event.
if len(proxies) == 0 {
return nil
}
// Check if this pod is in an error or waiting state and is missing
// proxy containers.
wlConfigErr := r.updater.CheckWorkloadContainers(wl, proxies)
// If the pod is misconfigured, attempt to delete it.
if wlConfigErr != nil {
l := logf.FromContext(ctx)
l.Info("Pod configured incorrectly. Deleting.",
"Namespace", pod.Namespace, "Name", pod.Name, "Status", pod.Status)
err = r.Client.Delete(ctx, pod)
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("unable to delete pod %v/%v, %v", pod.Namespace, pod.Name, err)
}
}
return nil
}