internal/workload/podspec_updates.go (751 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package workload import ( "fmt" "path" "sort" "strings" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" cloudsqlapi "github.com/GoogleCloudPlatform/cloud-sql-proxy-operator/internal/api/v1" ) // Constants for well known error codes and defaults. These are exposed on the // package and documented here so that they appear in the godoc. These also // need to be documented in the CRD const ( // DefaultProxyImage is the latest version of the proxy as of the release // of this operator. This is managed as a dependency. We update this constant // when the Cloud SQL Auth Proxy releases a new version. DefaultProxyImage = "gcr.io/cloud-sql-connectors/cloud-sql-proxy:2.16.0" // DefaultFirstPort is the first port number chose for an instance listener by the // proxy. DefaultFirstPort int32 = 5000 // DefaultHealthCheckPort is the used by the proxy to expose prometheus // and kubernetes health checks. DefaultHealthCheckPort int32 = 9801 // DefaultAdminPort is the used by the proxy to expose the quitquitquit // and debug api endpoints DefaultAdminPort int32 = 9091 ) var l = logf.Log.WithName("internal.workload") // PodAnnotation returns the annotation (key, value) that should be added to // pods that are configured with this AuthProxyWorkload resource. This takes // into account whether the AuthProxyWorkload exists or was recently deleted. // The defaultProxyImage is part of the annotation value. func PodAnnotation(r *cloudsqlapi.AuthProxyWorkload, defaultProxyImage string) (string, string) { img := defaultProxyImage if r.Spec.AuthProxyContainer != nil && r.Spec.AuthProxyContainer.Image != "" { img = "" } k := fmt.Sprintf("%s/%s", cloudsqlapi.AnnotationPrefix, r.Name) v := fmt.Sprintf("%d,%s", r.Generation, img) // if r was deleted, use a different value if !r.GetDeletionTimestamp().IsZero() { v = fmt.Sprintf("%d-deleted-%s,%s", r.Generation, r.GetDeletionTimestamp().Format(time.RFC3339), img) } return k, v } // PodAnnotation returns the annotation (key, value) that should be added to // pods that are configured with this AuthProxyWorkload resource. This takes // into account whether the AuthProxyWorkload exists or was recently deleted. func (u *Updater) PodAnnotation(r *cloudsqlapi.AuthProxyWorkload) (string, string) { return PodAnnotation(r, u.defaultProxyImage) } // Updater holds global state used while reconciling workloads. type Updater struct { // userAgent is the userAgent of the operator userAgent string // defaultProxyImage is the current default proxy image for the operator defaultProxyImage string // useSidecar specifies whether to use the Kubernetes SidecarContainers feature useSidecar bool } // NewUpdater creates a new instance of Updater with a supplier // that loads the default proxy image from the public docker registry func NewUpdater(userAgent, defaultProxyImage string, useSidecar bool) *Updater { return &Updater{ userAgent: userAgent, defaultProxyImage: defaultProxyImage, useSidecar: useSidecar, } } // ConfigError is an error with extra details about why an AuthProxyWorkload // cannot be configured. type ConfigError struct { workloadKind schema.GroupVersionKind workloadName string workloadNamespace string details []ConfigErrorDetail } func (e *ConfigError) DetailedErrors() []ConfigErrorDetail { return e.details } func (e *ConfigError) Error() string { return fmt.Sprintf("found %d configuration errors on workload %s %s/%s: %v", len(e.details), e.workloadKind.String(), e.workloadNamespace, e.workloadName, e.details) } func (e *ConfigError) add(errorCode, description string, p *cloudsqlapi.AuthProxyWorkload) { e.details = append(e.details, ConfigErrorDetail{ WorkloadKind: e.workloadKind, WorkloadName: e.workloadName, WorkloadNamespace: e.workloadNamespace, AuthProxyNamespace: p.GetNamespace(), AuthProxyName: p.GetName(), ErrorCode: errorCode, Description: description, }) } // ConfigErrorDetail is an error that contains details about specific kinds of errors that caused // a AuthProxyWorkload to fail when being configured on a workload. type ConfigErrorDetail struct { ErrorCode string Description string AuthProxyName string AuthProxyNamespace string WorkloadKind schema.GroupVersionKind WorkloadName string WorkloadNamespace string } func (e *ConfigErrorDetail) Error() string { return fmt.Sprintf("error %s %s while applying AuthProxyWorkload %s/%s to workload %s %s/%s", e.ErrorCode, e.Description, e.AuthProxyNamespace, e.AuthProxyName, e.WorkloadKind.String(), e.WorkloadNamespace, e.WorkloadName) } // defaultContainerResources used when the AuthProxyWorkload resource is not specified. var defaultContainerResources = corev1.ResourceRequirements{ Requests: corev1.ResourceList{ "cpu": resource.MustParse("1.0"), "memory": resource.MustParse("2Gi"), }, } // ConfigurePodProxies finds all AuthProxyWorkload resources matching this workload and then // updates the workload's containers. This does not save the updated workload. func (u *Updater) FindMatchingAuthProxyWorkloads(pl *cloudsqlapi.AuthProxyWorkloadList, wl *PodWorkload, owners []Workload) []*cloudsqlapi.AuthProxyWorkload { // starting with this pod, traverse the pod and its owners, and // fill wls with a list of workload resources that match an AuthProxyWorkload // in the pl. wls := u.filterMatchingInstances(pl, wl.Object()) for _, owner := range owners { wls = append(wls, u.filterMatchingInstances(pl, owner.Object())...) } // remove duplicates from wls by Name m := map[string]*cloudsqlapi.AuthProxyWorkload{} for _, w := range wls { m[w.GetNamespace()+"/"+w.GetName()] = w } wls = make([]*cloudsqlapi.AuthProxyWorkload, 0, len(m)) for _, w := range m { wls = append(wls, w) } // if this was updated return matching DBInstances return wls } // filterMatchingInstances returns a list of AuthProxyWorkload whose selectors match // the workload. func (u *Updater) filterMatchingInstances(pl *cloudsqlapi.AuthProxyWorkloadList, wl client.Object) []*cloudsqlapi.AuthProxyWorkload { matchingAuthProxyWorkloads := make([]*cloudsqlapi.AuthProxyWorkload, 0, len(pl.Items)) for i := range pl.Items { p := &pl.Items[i] if workloadMatches(wl, p.Spec.Workload, p.Namespace) { // if this is pending deletion, exclude it. if !p.ObjectMeta.DeletionTimestamp.IsZero() { continue } matchingAuthProxyWorkloads = append(matchingAuthProxyWorkloads, p) } } return matchingAuthProxyWorkloads } // CheckWorkloadContainers determines if a pod is configured incorrectly and // therefore needs to be deleted. Pods must be (1) missing one or more proxy // sidecar containers and (2) have a terminated container. func (u *Updater) CheckWorkloadContainers(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWorkload) error { // Find the names of all AuthProxyWorkload resources that should have a // container on this pod, but there is no container. var missing []string for _, p := range matches { wantName := ContainerName(p) var found bool podSpec := wl.PodSpec() allContainers := append(podSpec.Containers, podSpec.InitContainers...) for _, c := range allContainers { if c.Name == wantName { found = true } } if !found { missing = append(missing, p.Name) break } } // If no containers are missing, then there is no error, return nil. if len(missing) == 0 { return nil } missingSidecars := strings.Join(missing, ", ") // Some proxy containers are missing. Are the remaining pod containers failing? for _, cs := range wl.Pod.Status.ContainerStatuses { if cs.State.Terminated != nil && cs.State.Terminated.Reason == "Error" { return fmt.Errorf("pod is in an error state and missing sidecar containers %v", missingSidecars) } if cs.State.Waiting != nil && cs.State.Waiting.Reason == "CrashLoopBackOff" { return fmt.Errorf("pod is in a CrashLoopBackOff state and missing sidecar containers %v", missingSidecars) } } // Pod's other containers are not in an error state. Operator should not // interrupt running containers. return nil } // ConfigureWorkload applies the proxy containers from all of the // instances listed in matchingAuthProxyWorkloads to the workload func (u *Updater) ConfigureWorkload(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWorkload) error { state := updateState{ updater: u, nextDBPort: DefaultFirstPort, err: ConfigError{ workloadKind: wl.Object().GetObjectKind().GroupVersionKind(), workloadName: wl.Object().GetName(), workloadNamespace: wl.Object().GetNamespace(), }, } return state.update(wl, matches) } type managedEnvVar struct { Instance proxyInstanceID `json:"proxyInstanceID"` ContainerName string `json:"containerName"` OperatorManagedValue corev1.EnvVar `json:"operatorManagedValue"` } type managedPort struct { Instance proxyInstanceID `json:"proxyInstanceID"` Port int32 `json:"port,omitempty"` } type managedVolume struct { Volume corev1.Volume `json:"volume"` VolumeMount corev1.VolumeMount `json:"volumeMount"` Instance proxyInstanceID `json:"proxyInstanceID"` } // proxyInstanceID is an identifier for a proxy and/or specific proxy database // instance that created the EnvVar or Port. When this is empty, means that the // EnvVar or Port was created by the user, and is not associated with a proxy type proxyInstanceID struct { AuthProxyWorkload types.NamespacedName `json:"authProxyWorkload"` ConnectionString string `json:"connectionString"` } // updateState holds internal state while a particular workload being configured // with one or more DBInstances. type updateState struct { err ConfigError mods workloadMods nextDBPort int32 updater *Updater } // workloadMods holds all modifications to this workload done by the operator so // so that it can be undone later. type workloadMods struct { DBInstances []*proxyInstanceID `json:"dbInstances"` EnvVars []*managedEnvVar `json:"envVars"` VolumeMounts []*managedVolume `json:"volumeMounts"` Ports []*managedPort `json:"ports"` AdminPorts []int32 `json:"adminPorts"` } func (s *updateState) addWorkloadPort(p int32) { // This port is associated with the workload, not the proxy. // so this uses an empty proxyInstanceID{} s.addPort(p, proxyInstanceID{}) } func (s *updateState) addProxyPort(port int32, p *cloudsqlapi.AuthProxyWorkload) { // This port is associated with the workload, not the proxy. // so this uses an empty proxyInstanceID{} s.addPort(port, proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Namespace: p.Namespace, Name: p.Name, }, }) } // isPortInUse checks if the port is in use. func (s *updateState) isPortInUse(p int32) bool { for i := 0; i < len(s.mods.Ports); i++ { if p == s.mods.Ports[i].Port { return true } } return false } // useNextDbPort consumes the next available db port, marking that port as "in-use." func (s *updateState) useInstancePort(p *cloudsqlapi.AuthProxyWorkload, is *cloudsqlapi.InstanceSpec) int32 { n := types.NamespacedName{ Namespace: p.Namespace, Name: p.Name, } // Does a managedPort already exist for this workload+instance? var proxyPort *managedPort for _, mp := range s.mods.Ports { if mp.Instance.AuthProxyWorkload == n && mp.Instance.ConnectionString == is.ConnectionString { proxyPort = mp break } } // Update the managedPort for this workload+instance if proxyPort != nil { if is.Port != nil && proxyPort.Port != *is.Port { if s.isPortInUse(*is.Port) { s.addError(cloudsqlapi.ErrorCodePortConflict, fmt.Sprintf("proxy port %d for instance %s is already in use", *is.Port, is.ConnectionString), p) } proxyPort.Port = *is.Port } return proxyPort.Port } // Since this is a new workload+instance, figure out the port number var port int32 if is.Port != nil { port = *is.Port } else { for s.isPortInUse(s.nextDBPort) { s.nextDBPort++ } port = s.nextDBPort } if s.isPortInUse(port) { s.addError(cloudsqlapi.ErrorCodePortConflict, fmt.Sprintf("proxy port %d for instance %s is already in use", port, is.ConnectionString), p) } s.addPort(port, proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Name: p.Name, Namespace: p.Namespace, }, ConnectionString: is.ConnectionString, }) return port } func (s *updateState) addAdminPort(p int32) { s.mods.AdminPorts = append(s.mods.AdminPorts, p) } func (s *updateState) addQuitEnvVar() { urls := make([]string, len(s.mods.AdminPorts)) for i := 0; i < len(s.mods.AdminPorts); i++ { urls[i] = fmt.Sprintf("http://localhost:%d/quitquitquit", s.mods.AdminPorts[i]) } v := strings.Join(urls, " ") s.addEnvVar(nil, managedEnvVar{ OperatorManagedValue: corev1.EnvVar{ Name: "CSQL_PROXY_QUIT_URLS", Value: v, }}) } func (s *updateState) addPort(p int32, instance proxyInstanceID) { var mp *managedPort for i := 0; i < len(s.mods.Ports); i++ { if s.mods.Ports[i].Port == p { mp = s.mods.Ports[i] } } if mp == nil { mp = &managedPort{ Instance: instance, Port: p, } s.mods.Ports = append(s.mods.Ports, mp) } } func (s *updateState) addProxyContainerEnvVar(p *cloudsqlapi.AuthProxyWorkload, k, v string) { s.addEnvVar(p, managedEnvVar{ Instance: proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Namespace: p.Namespace, Name: p.Name, }, }, ContainerName: ContainerName(p), OperatorManagedValue: corev1.EnvVar{Name: k, Value: v}, }) } // addWorkloadEnvVar adds or replaces the envVar based on its Name, returning the old and new values func (s *updateState) addWorkloadEnvVar(p *cloudsqlapi.AuthProxyWorkload, is *cloudsqlapi.InstanceSpec, ev corev1.EnvVar) { s.addEnvVar(p, managedEnvVar{ Instance: proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Namespace: p.Namespace, Name: p.Name, }, ConnectionString: is.ConnectionString, }, OperatorManagedValue: ev, }) } func (s *updateState) addEnvVar(p *cloudsqlapi.AuthProxyWorkload, v managedEnvVar) { for i := 0; i < len(s.mods.EnvVars); i++ { oldEnv := s.mods.EnvVars[i] // if the values don't match and either one is global, or its set twice if isEnvVarConflict(oldEnv, v) { s.addError(cloudsqlapi.ErrorCodeEnvConflict, fmt.Sprintf("environment variable named %s is set more than once", oldEnv.OperatorManagedValue.Name), p) return } } s.mods.EnvVars = append(s.mods.EnvVars, &v) } func isEnvVarConflict(oldEnv *managedEnvVar, v managedEnvVar) bool { // it's a different name, no conflict if oldEnv.OperatorManagedValue.Name != v.OperatorManagedValue.Name { return false } // if the envvar is intended for a different container if oldEnv.ContainerName != v.ContainerName && oldEnv.ContainerName != "" && v.ContainerName != "" { return false } // different value, therefore conflict return oldEnv.OperatorManagedValue.Value != v.OperatorManagedValue.Value } func (s *updateState) initState(pl []*cloudsqlapi.AuthProxyWorkload) { // Reset the mods.DBInstances to the list of pl being // applied right now. s.mods.DBInstances = make([]*proxyInstanceID, 0, len(pl)) for _, wl := range pl { for _, instance := range wl.Spec.Instances { s.mods.DBInstances = append(s.mods.DBInstances, &proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Namespace: wl.Namespace, Name: wl.Name, }, ConnectionString: instance.ConnectionString, }) } } } // update Reconciles the state of a workload, applying the matching DBInstances // and removing any out-of-date configuration related to deleted DBInstances func (s *updateState) update(wl *PodWorkload, matches []*cloudsqlapi.AuthProxyWorkload) error { s.initState(matches) podSpec := wl.PodSpec() allContainers := append(podSpec.Containers, podSpec.InitContainers...) for _, container := range allContainers { if !strings.HasPrefix(container.Name, ContainerPrefix) { for _, port := range container.Ports { s.addWorkloadPort(port.ContainerPort) } } } // Copy the existing pod annotation map ann := map[string]string{} for k, v := range wl.PodTemplateAnnotations() { ann[k] = v } // add all new containers and update existing containers for i := range matches { inst := matches[i] newContainer := corev1.Container{} s.updateContainer(inst, &newContainer) if s.updater.useSidecar { podSpec.InitContainers = append([]corev1.Container{newContainer}, podSpec.InitContainers...) } else { podSpec.Containers = append(podSpec.Containers, newContainer) } // Add pod annotation for each instance k, v := s.updater.PodAnnotation(inst) ann[k] = v } // Add the envvar containing the proxy quit urls to the workloads s.addQuitEnvVar() if len(ann) != 0 { wl.SetPodTemplateAnnotations(ann) } s.processContainers(&podSpec.Containers) s.processContainers(&podSpec.InitContainers) s.applyVolumes(&podSpec) // only return ConfigError if there were reported // errors during processing. if len(s.err.details) > 0 { return &s.err } wl.SetPodSpec(podSpec) return nil } // processContainers applies container envs and volumes to all containers func (s *updateState) processContainers(containers *[]corev1.Container) { for i := range *containers { c := &(*containers)[i] s.updateContainerEnv(c) s.applyContainerVolumes(c) } } // updateContainer Creates or updates the proxy container in the workload's PodSpec func (s *updateState) updateContainer(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) { // if the c was fully overridden, just use that c. if p.Spec.AuthProxyContainer != nil && p.Spec.AuthProxyContainer.Container != nil { p.Spec.AuthProxyContainer.Container.DeepCopyInto(c) c.Name = ContainerName(p) return } // always enable http port healthchecks on 0.0.0.0 and structured logs s.addHealthCheck(p, c) s.applyTelemetrySpec(p) // enable the proxy's admin service s.addAdminServer(p) // configure container authentication s.addAuthentication(p) // add the user agent s.addProxyContainerEnvVar(p, "CSQL_PROXY_USER_AGENT", s.updater.userAgent) // configure structured logs s.addProxyContainerEnvVar(p, "CSQL_PROXY_STRUCTURED_LOGS", "true") // configure quiet logs if p.Spec.AuthProxyContainer != nil && p.Spec.AuthProxyContainer.Quiet { s.addProxyContainerEnvVar(p, "CSQL_PROXY_QUIET", "true") } // configure lazy refresh if p.Spec.AuthProxyContainer != nil && p.Spec.AuthProxyContainer.RefreshStrategy == cloudsqlapi.RefreshStrategyLazy { s.addProxyContainerEnvVar(p, "CSQL_PROXY_LAZY_REFRESH", "true") } c.Name = ContainerName(p) c.ImagePullPolicy = corev1.PullIfNotPresent if s.updater.useSidecar { policy := corev1.ContainerRestartPolicyAlways c.RestartPolicy = &policy } s.applyContainerSpec(p, c) // Build the c var cliArgs []string // Instances for i := range p.Spec.Instances { inst := &p.Spec.Instances[i] params := map[string]string{} // if it is a TCP socket if inst.UnixSocketPath == "" { port := s.useInstancePort(p, inst) params["port"] = fmt.Sprint(port) if inst.HostEnvName != "" { s.addWorkloadEnvVar(p, inst, corev1.EnvVar{ Name: inst.HostEnvName, Value: "127.0.0.1", }) } if inst.PortEnvName != "" { s.addWorkloadEnvVar(p, inst, corev1.EnvVar{ Name: inst.PortEnvName, Value: fmt.Sprint(port), }) } } else { // else if it is a unix socket params["unix-socket-path"] = inst.UnixSocketPath mountName := VolumeName(p, inst, "unix") s.addVolumeMount(p, inst, corev1.VolumeMount{ Name: mountName, ReadOnly: false, MountPath: path.Dir(inst.UnixSocketPath), }, corev1.Volume{ Name: mountName, VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, }) if inst.UnixSocketPathEnvName != "" { s.addWorkloadEnvVar(p, inst, corev1.EnvVar{ Name: inst.UnixSocketPathEnvName, Value: inst.UnixSocketPath, }) } } if inst.AutoIAMAuthN != nil { if *inst.AutoIAMAuthN { params["auto-iam-authn"] = "true" } else { params["auto-iam-authn"] = "false" } } if inst.PrivateIP != nil { if *inst.PrivateIP { params["private-ip"] = "true" } else { params["private-ip"] = "false" } } if inst.PSC != nil { if *inst.PSC { params["psc"] = "true" } else { params["psc"] = "false" } } var instArgs []string for k, v := range params { instArgs = append(instArgs, fmt.Sprintf("%s=%s", k, v)) } // sort the param args to make testing easier. params will always be // in a stable order sort.Strings(instArgs) if len(instArgs) > 0 { cliArgs = append(cliArgs, fmt.Sprintf("%s?%s", inst.ConnectionString, strings.Join(instArgs, "&"))) } else { cliArgs = append(cliArgs, inst.ConnectionString) } } c.Args = cliArgs } // applyContainerSpec applies settings from cloudsqlapi.AuthProxyContainerSpec // to the container func (s *updateState) applyContainerSpec(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) { t := true var f bool c.Image = s.defaultProxyImage() c.Resources = defaultContainerResources c.SecurityContext = &corev1.SecurityContext{ // The default Cloud SQL Auth Proxy image runs as the // "nonroot" user and group (uid: 65532) by default. RunAsNonRoot: &t, // Use a read-only filesystem ReadOnlyRootFilesystem: &t, // Do not allow privilege escalation AllowPrivilegeEscalation: &f, } if p.Spec.AuthProxyContainer == nil { return } if p.Spec.AuthProxyContainer.Image != "" { c.Image = p.Spec.AuthProxyContainer.Image } if p.Spec.AuthProxyContainer.Resources != nil { c.Resources = *p.Spec.AuthProxyContainer.Resources.DeepCopy() } if p.Spec.AuthProxyContainer.SQLAdminAPIEndpoint != "" { s.addProxyContainerEnvVar(p, "CSQL_PROXY_SQLADMIN_API_ENDPOINT", p.Spec.AuthProxyContainer.SQLAdminAPIEndpoint) } if p.Spec.AuthProxyContainer.MaxConnections != nil && *p.Spec.AuthProxyContainer.MaxConnections != 0 { s.addProxyContainerEnvVar(p, "CSQL_PROXY_MAX_CONNECTIONS", fmt.Sprintf("%d", *p.Spec.AuthProxyContainer.MaxConnections)) } if p.Spec.AuthProxyContainer.MaxSigtermDelay != nil && *p.Spec.AuthProxyContainer.MaxSigtermDelay != 0 { s.addProxyContainerEnvVar(p, "CSQL_PROXY_MAX_SIGTERM_DELAY", fmt.Sprintf("%ds", *p.Spec.AuthProxyContainer.MaxSigtermDelay)) } if p.Spec.AuthProxyContainer.MinSigtermDelay != nil && *p.Spec.AuthProxyContainer.MinSigtermDelay != 0 { s.addProxyContainerEnvVar(p, "CSQL_PROXY_MIN_SIGTERM_DELAY", fmt.Sprintf("%ds", *p.Spec.AuthProxyContainer.MinSigtermDelay)) } return } // applyTelemetrySpec applies settings from cloudsqlapi.TelemetrySpec // to the container func (s *updateState) applyTelemetrySpec(p *cloudsqlapi.AuthProxyWorkload) { if p.Spec.AuthProxyContainer == nil || p.Spec.AuthProxyContainer.Telemetry == nil { return } tel := p.Spec.AuthProxyContainer.Telemetry if tel.TelemetrySampleRate != nil { s.addProxyContainerEnvVar(p, "CSQL_PROXY_TELEMETRY_SAMPLE_RATE", fmt.Sprintf("%d", *tel.TelemetrySampleRate)) } if tel.DisableTraces != nil && *tel.DisableTraces { s.addProxyContainerEnvVar(p, "CSQL_PROXY_DISABLE_TRACES", "true") } if tel.DisableMetrics != nil && *tel.DisableMetrics { s.addProxyContainerEnvVar(p, "CSQL_PROXY_DISABLE_METRICS", "true") } if tel.PrometheusNamespace != nil || (tel.Prometheus != nil && *tel.Prometheus) { s.addProxyContainerEnvVar(p, "CSQL_PROXY_PROMETHEUS", "true") } if tel.PrometheusNamespace != nil { s.addProxyContainerEnvVar(p, "CSQL_PROXY_PROMETHEUS_NAMESPACE", *tel.PrometheusNamespace) } if tel.TelemetryProject != nil { s.addProxyContainerEnvVar(p, "CSQL_PROXY_TELEMETRY_PROJECT", *tel.TelemetryProject) } if tel.TelemetryPrefix != nil { s.addProxyContainerEnvVar(p, "CSQL_PROXY_TELEMETRY_PREFIX", *tel.TelemetryPrefix) } if tel.QuotaProject != nil { s.addProxyContainerEnvVar(p, "CSQL_PROXY_QUOTA_PROJECT", *tel.QuotaProject) } return } // updateContainerEnv applies global container state to all containers func (s *updateState) updateContainerEnv(c *corev1.Container) { for i := 0; i < len(s.mods.EnvVars); i++ { var found bool v := s.mods.EnvVars[i] operatorEnv := v.OperatorManagedValue // If this EnvVar is not for this container and not for all containers // don't add it to this container. if v.ContainerName != c.Name && v.ContainerName != "" { continue } for j := 0; j < len(c.Env); j++ { if operatorEnv.Name == c.Env[j].Name { found = true c.Env[j] = operatorEnv } } if !found { c.Env = append(c.Env, operatorEnv) } } } // addHealthCheck adds the health check declaration to this workload. func (s *updateState) addHealthCheck(p *cloudsqlapi.AuthProxyWorkload, c *corev1.Container) int32 { var portPtr *int32 var adminPortPtr *int32 cs := p.Spec.AuthProxyContainer // if the TelemetrySpec.exists, get Port and Port values if cs != nil && cs.Telemetry != nil { if cs.Telemetry.HTTPPort != nil { portPtr = cs.Telemetry.HTTPPort } } port := s.usePort(portPtr, DefaultHealthCheckPort, p) c.StartupProbe = &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{ Port: intstr.IntOrString{IntVal: port}, Path: "/startup", }}, PeriodSeconds: 1, FailureThreshold: 60, TimeoutSeconds: 10, } c.LivenessProbe = &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{HTTPGet: &corev1.HTTPGetAction{ Port: intstr.IntOrString{IntVal: port}, Path: "/liveness", }}, PeriodSeconds: 10, FailureThreshold: 3, TimeoutSeconds: 10, } // Add a port that is associated with the proxy, but not a specific db instance s.addProxyPort(port, p) s.addProxyContainerEnvVar(p, "CSQL_PROXY_HTTP_PORT", fmt.Sprintf("%d", port)) s.addProxyContainerEnvVar(p, "CSQL_PROXY_HTTP_ADDRESS", "0.0.0.0") s.addProxyContainerEnvVar(p, "CSQL_PROXY_HEALTH_CHECK", "true") // For graceful exits as a sidecar, the proxy should exit with exit code 0 // when it receives a SIGTERM. s.addProxyContainerEnvVar(p, "CSQL_PROXY_EXIT_ZERO_ON_SIGTERM", "true") // Add a containerPort declaration for the healthcheck & telemetry port c.Ports = append(c.Ports, corev1.ContainerPort{ ContainerPort: port, Protocol: corev1.ProtocolTCP, }) // Also the operator will enable the /quitquitquit endpoint for graceful exit. // If the AdminServer.Port is set, use it, otherwise use the default // admin port. if cs != nil && cs.AdminServer != nil && cs.AdminServer.Port != 0 { adminPortPtr = &cs.AdminServer.Port } adminPort := s.usePort(adminPortPtr, DefaultAdminPort, p) s.addAdminPort(adminPort) s.addProxyContainerEnvVar(p, "CSQL_PROXY_QUITQUITQUIT", "true") s.addProxyContainerEnvVar(p, "CSQL_PROXY_ADMIN_PORT", fmt.Sprintf("%d", adminPort)) // Configure the pre-stop hook for /quitquitquit c.Lifecycle = &corev1.Lifecycle{ PreStop: &corev1.LifecycleHandler{ HTTPGet: &corev1.HTTPGetAction{ Port: intstr.IntOrString{IntVal: adminPort}, Path: "/quitquitquit", Host: "localhost", }, }, } return adminPort } func (s *updateState) addAdminServer(p *cloudsqlapi.AuthProxyWorkload) { if p.Spec.AuthProxyContainer == nil || p.Spec.AuthProxyContainer.AdminServer == nil { return } cs := p.Spec.AuthProxyContainer.AdminServer for _, name := range cs.EnableAPIs { switch name { case "Debug": s.addProxyContainerEnvVar(p, "CSQL_PROXY_DEBUG", "true") } } } func (s *updateState) addAuthentication(p *cloudsqlapi.AuthProxyWorkload) { if p.Spec.AuthProxyContainer == nil || p.Spec.AuthProxyContainer.Authentication == nil { return } as := p.Spec.AuthProxyContainer.Authentication if len(as.ImpersonationChain) > 0 { s.addProxyContainerEnvVar(p, "CSQL_PROXY_IMPERSONATE_SERVICE_ACCOUNT", strings.Join(as.ImpersonationChain, ",")) } } func (s *updateState) addVolumeMount(p *cloudsqlapi.AuthProxyWorkload, is *cloudsqlapi.InstanceSpec, m corev1.VolumeMount, v corev1.Volume) { key := proxyInstanceID{ AuthProxyWorkload: types.NamespacedName{ Namespace: p.Namespace, Name: p.Name, }, ConnectionString: is.ConnectionString, } vol := &managedVolume{ Instance: key, Volume: v, VolumeMount: m, } for i, mount := range s.mods.VolumeMounts { if mount.Instance == key { s.mods.VolumeMounts[i] = vol return } if mount.VolumeMount.MountPath == vol.VolumeMount.MountPath { // avoid adding volume mounts with redundant MountPaths, // just the first one is enough. return } } s.mods.VolumeMounts = append(s.mods.VolumeMounts, vol) } // applyContainerVolumes applies all the VolumeMounts to this container. func (s *updateState) applyContainerVolumes(c *corev1.Container) { nameAccessor := func(v corev1.VolumeMount) string { return v.Name } thingAccessor := func(v *managedVolume) corev1.VolumeMount { return v.VolumeMount } c.VolumeMounts = applyVolumeThings[corev1.VolumeMount](s, c.VolumeMounts, nameAccessor, thingAccessor) } // applyVolumes applies all volumes to this PodSpec. func (s *updateState) applyVolumes(ps *corev1.PodSpec) { nameAccessor := func(v corev1.Volume) string { return v.Name } thingAccessor := func(v *managedVolume) corev1.Volume { return v.Volume } ps.Volumes = applyVolumeThings[corev1.Volume](s, ps.Volumes, nameAccessor, thingAccessor) } // applyVolumeThings modifies a slice of Volume/VolumeMount, to include all the // shared volumes for the proxy container's unix sockets. This will replace // an existing volume with the same name, or append a new volume to the slice. func applyVolumeThings[T corev1.VolumeMount | corev1.Volume]( s *updateState, newVols []T, nameAccessor func(T) string, thingAccessor func(*managedVolume) T) []T { // add or replace items for all new volume mounts for i := 0; i < len(s.mods.VolumeMounts); i++ { var found bool newVol := thingAccessor(s.mods.VolumeMounts[i]) for j := 0; j < len(newVols); j++ { if nameAccessor(newVol) == nameAccessor(newVols[j]) { found = true newVols[j] = newVol } } if !found { newVols = append(newVols, newVol) } } return newVols } func (s *updateState) addError(errorCode, description string, p *cloudsqlapi.AuthProxyWorkload) { s.err.add(errorCode, description, p) } func (s *updateState) defaultProxyImage() string { return s.updater.defaultProxyImage } func (s *updateState) usePort(configValue *int32, defaultValue int32, p *cloudsqlapi.AuthProxyWorkload) int32 { if configValue != nil { s.addProxyPort(*configValue, p) return *configValue } port := defaultValue if configValue == nil { for s.isPortInUse(port) { port++ } } s.addProxyPort(port, p) return port }