projects/k8s-hybrid-neg-controller/pkg/reconciler/service.go (275 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package reconciler import ( "context" "crypto/md5" // #nosec Not using MD5 as a secure hash function "encoding/json" "errors" "fmt" "strconv" "time" "github.com/go-logr/logr" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/googlecloudplatform/k8s-hybrid-neg-controller/pkg/annotation" "github.com/googlecloudplatform/k8s-hybrid-neg-controller/pkg/neg" ) const ( FinalizerName = "solutions.cloud.google.com/hybrid-neg" maxLengthNEGName = 63 ) // serviceReconciler implements `reconcile.Reconciler` for Kubernetes Service objects. // See https://pkg.go.dev/sigs.k8s.io/controller-runtime@main/pkg/reconcile type serviceReconciler struct { client.Client clusterID string negClient *neg.Client recorder record.EventRecorder requeueAfter time.Duration zones []string } var _ reconcile.Reconciler = &serviceReconciler{} func NewServiceReconciler(k8sClient client.Client, recorder record.EventRecorder, negClient *neg.Client, clusterID string, zones []string, requeueAfter time.Duration) reconcile.Reconciler { return &serviceReconciler{ Client: k8sClient, clusterID: clusterID, negClient: negClient, recorder: recorder, requeueAfter: requeueAfter, zones: zones, } } // Reconcile performs a full reconciliation for the Kubernetes Service object referred to by // the `reconcile.Request`. The controller will requeue the `reconcile.Request` to be processed // again if an error is non-nil or `reconcile.Result.Requeue` is `true`, otherwise upon completion // it will remove the work from the queue. func (r *serviceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { logger := log.FromContext(ctx) err := r.reconcile(ctx, logger, req.NamespacedName) if err == nil || errors.Is(err, reconcile.TerminalError(nil)) { return reconcile.Result{}, err } logger.Error(err, "Requeueing") return reconcile.Result{ Requeue: true, RequeueAfter: r.requeueAfter, }, nil } // reconcile creates and deletes hybrid NEGs for the provided Service, based on the hybrid NEG // config annotation on the Service. func (r *serviceReconciler) reconcile(ctx context.Context, logger logr.Logger, serviceNamespacedName types.NamespacedName) error { service := &corev1.Service{} if err := r.Get(ctx, serviceNamespacedName, service); err != nil { // Ignore not-found errors, since they can't be fixed by an immediate requeue. return client.IgnoreNotFound(err) } if !service.ObjectMeta.DeletionTimestamp.IsZero() { if err := r.handleDelete(ctx, logger, service); err != nil { return err } // Stop reconciliation as the item is being deleted return nil } negConfig, negConfigAnnotationExists, err := getHybridNEGConfig(service.ObjectMeta.Annotations) if err != nil { r.recorder.Event(service, "Warning", "ConfigError", err.Error()) return reconcile.TerminalError(err) } logger.V(4).Info("Hybrid NEG config annotation", "negConfig", negConfig) negStatus, negStatusAnnotationExists, err := getHybridNEGStatus(service.ObjectMeta.Annotations) if err != nil { r.recorder.Event(service, "Warning", "ConfigError", err.Error()) logger.Error(err, "Problem reading the hybrid NEG status annotation, proceeding as if it does not exist") } logger.V(4).Info("Hybrid NEG status annotation", "negStatus", negStatus) if !negConfigAnnotationExists && !negStatusAnnotationExists { // No hybrid NEG annotations, do nothing. return nil } negsToCreate, negsToDelete := r.syncNEGs(ctx, logger, serviceNamespacedName, negConfig, negStatus) if err := r.updateService(ctx, logger, service, negStatus, negsToCreate, negsToDelete); err != nil { return err } return nil } // syncNEGs creates and deletes NEGs for the provided Service, based on the provided hybrid NEG // config annotation. It also updates the Service's hybrid NEG status annotation to reflect the // names and Compute Engine zones of the Service's hybrid NEGs. // This method intentionally does not propagate errors, and instead just logs them. The reason is // to prevent infinite reconciliation retries in case of unrecoverable errors, such as IAM // permission issues. func (r *serviceReconciler) syncNEGs(ctx context.Context, logger logr.Logger, service types.NamespacedName, negConfig annotation.NEGConfig, negStatus annotation.NEGStatus) (map[string]string, map[string]string) { negsToCreate := getNEGsToCreate(logger, negConfig, negStatus, r.clusterID, service) if err := r.createNEGs(ctx, logger, negsToCreate, service); err != nil { logger.Error(err, "Problem creating hybrid NEGs") } negsToDelete := getNEGsToDelete(logger, negConfig, negStatus) if err := r.deleteNEGs(ctx, logger, negsToDelete, service); err != nil { logger.Error(err, "Problem deleting hybrid NEGs that were removed from the hybrid NEG config annotation") } return negsToCreate, negsToDelete } // getHybridNEGConfig returns the config annotation added by this controller, similar to // the `cloud.google.com/neg` annotation, see // https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#naming_negs func getHybridNEGConfig(annotations map[string]string) (annotation.NEGConfig, bool, error) { annotationValue, exists := annotations[annotation.HybridNEGConfigKey] if !exists { return annotation.NEGConfig{ ExposedPorts: map[int32]annotation.NEGAttributes{}, }, false, nil } negConfig := annotation.NEGConfig{} if err := json.Unmarshal([]byte(annotationValue), &negConfig); err != nil { return annotation.NEGConfig{}, false, fmt.Errorf("could not unmarshal value of annotation [%s: %s]: %w", annotation.HybridNEGConfigKey, annotationValue, err) } if negConfig.ExposedPorts == nil { negConfig.ExposedPorts = map[int32]annotation.NEGAttributes{} } return negConfig, true, nil } // getHybridNEGStatus returns the hybrid NEG status annotation added by this controller. // The format of the hybrid NEG status annotation matches the format of the // `cloud.google.com/neg-status` annotation that the GKE NEG controller adds for standalone NEGs, // see https://cloud.google.com/kubernetes-engine/docs/how-to/standalone-neg#retrieve-neg-status func getHybridNEGStatus(annotations map[string]string) (annotation.NEGStatus, bool, error) { annotationValue, exists := annotations[annotation.HybridNEGStatusKey] if !exists { return annotation.NEGStatus{ NetworkEndpointGroups: map[string]string{}, }, false, nil } negStatus := annotation.NEGStatus{} if err := json.Unmarshal([]byte(annotationValue), &negStatus); err != nil { return annotation.NEGStatus{}, false, fmt.Errorf("could not unmarshal value of annotation [%s: %s]: %w", annotation.HybridNEGStatusKey, annotationValue, err) } if negStatus.NetworkEndpointGroups == nil { negStatus.NetworkEndpointGroups = map[string]string{} } return negStatus, true, nil } // getNEGsToCreate returns entries from the `exposed_ports` map in the hybrid NEG config // annotation that do _not_ have corresponding entries in the `network_endpoint_groups` map in the // hybrid NEG status annotation, matching by the key (Service port number). // It also generates NEG names if not specified in the `exposed_ports` map in the hybrid NEG config // annotation. func getNEGsToCreate(logger logr.Logger, negConfig annotation.NEGConfig, negStatus annotation.NEGStatus, clusterID string, service types.NamespacedName) map[string]string { negsToCreate := map[string]string{} for exposedPortNumber, negNameFromConfig := range negConfig.ExposedPorts { servicePortNumber := strconv.FormatInt(int64(exposedPortNumber), 10) negNameFromStatus, existsInStatus := negStatus.NetworkEndpointGroups[servicePortNumber] if existsInStatus { if negNameFromConfig.Name != "" && negNameFromConfig.Name != negNameFromStatus { logger.Info("Mismatched NEG names in config and status annotations, NEG names will not be updated", "negConfig", negConfig, "negStatus", negStatus) } // Don't recreate NEG, since it should already exist if it's in the status annotation. continue } if negNameFromConfig.Name == "" { // #nosec Not using MD5 as a secure hash function suffix := fmt.Sprintf("-%0x", md5.Sum([]byte(clusterID+service.Namespace+service.Name+servicePortNumber)))[:9] prefix := fmt.Sprintf("k8s1-%s-%s-%s-%s", clusterID, service.Namespace, service.Name, servicePortNumber) if len(prefix+suffix) > maxLengthNEGName { prefix = prefix[:maxLengthNEGName-len(suffix)] } negNameFromConfig = annotation.NEGAttributes{ Name: prefix + suffix, } } negsToCreate[servicePortNumber] = negNameFromConfig.Name } return negsToCreate } // getNEGsToDelete returns entries from the `network_endpoint_groups` map in the hybrid NEG status // annotation that do _not_ have corresponding entries in the `exposed_ports` map in the hybrid NEG // config annotation, matching by the key (Service port number). func getNEGsToDelete(logger logr.Logger, negConfig annotation.NEGConfig, negStatus annotation.NEGStatus) map[string]string { negsToDelete := map[string]string{} for servicePortNumber, negName := range negStatus.NetworkEndpointGroups { exposedPortNumber, err := strconv.ParseInt(servicePortNumber, 10, 32) if err != nil { logger.Error(err, "invalid service port number in hybrid NEG status annotation, skipping entry", "servicePortNumber", servicePortNumber) continue } _, existsInConfig := negConfig.ExposedPorts[int32(exposedPortNumber)] if !existsInConfig { logger.V(2).Info("NEG from status annotation missing from config annotation, deleting", "port", servicePortNumber, "networkEndpointGroup", negName) negsToDelete[servicePortNumber] = negName } } return negsToDelete } // createNEGs creates hybrid NEGs for all of the provided Service port numbers, across all // zones configured on the controller. func (r *serviceReconciler) createNEGs(ctx context.Context, logger logr.Logger, servicePortNumberToNEGName map[string]string, service types.NamespacedName) error { g, groupCtx := errgroup.WithContext(ctx) for servicePortNumber, negName := range servicePortNumberToNEGName { g.Go(func() error { logger.Info("Creating NEGs", "servicePortNumber", servicePortNumber, "name", negName) if err := r.negClient.CreateNEGs(groupCtx, logger, negName, service); err != nil { return fmt.Errorf("problem creating NEG with port=%s name=%s: %w", servicePortNumber, negName, err) } return nil }) } if err := g.Wait(); err != nil { return fmt.Errorf("problem creating one or more NEGs: %w", err) } return nil } // deleteNEGs deletes hybrid NEGs for all of the provided Service port numbers, across all // zones configured on the controller. func (r *serviceReconciler) deleteNEGs(ctx context.Context, logger logr.Logger, servicePortNumberToNEGName map[string]string, service types.NamespacedName) error { g, groupCtx := errgroup.WithContext(ctx) for servicePortNumber, negName := range servicePortNumberToNEGName { g.Go(func() error { logger.Info("Deleting NEGs", "servicePortNumber", servicePortNumber, "name", negName) if err := r.negClient.DeleteNEGs(groupCtx, logger, negName, service); err != nil { return fmt.Errorf("problem deleting NEG with port=%s name=%s: %w", servicePortNumber, negName, err) } return nil }) } if err := g.Wait(); err != nil { return fmt.Errorf("problem deleting one or more NEGs: %w", err) } return nil } // updateService connects to the Kubernetes cluster API server to update the Service definition if // the reconciler made any of the following changes to the Service: // // - added or removed the `solutions.cloud.google.com/hybrid-neg` finalizer. // - created or removed hybrid NEGs. func (r *serviceReconciler) updateService(ctx context.Context, logger logr.Logger, service *corev1.Service, negStatus annotation.NEGStatus, negsToCreate map[string]string, negsToDelete map[string]string) error { if err := updateNEGStatusIfRequired(logger, negStatus, negsToCreate, negsToDelete, r.zones, service); err != nil { return err } var updatedFinalizer bool if len(negStatus.NetworkEndpointGroups) > 0 { updatedFinalizer = controllerutil.AddFinalizer(service, FinalizerName) } else { updatedFinalizer = controllerutil.RemoveFinalizer(service, FinalizerName) } if updatedFinalizer || len(negsToCreate) > 0 || len(negsToDelete) > 0 { // The Service has been modified, update it. return r.Update(ctx, service) } return nil } // updateNEGStatusIfRequired updates the hybrid NEG status annotation value on the Service // if the reconciler added or removed hybrid NEGs. func updateNEGStatusIfRequired(logger logr.Logger, negStatus annotation.NEGStatus, negsToCreate map[string]string, negsToDelete map[string]string, zones []string, service *corev1.Service) error { if len(negsToCreate) == 0 && len(negsToDelete) == 0 { return nil } for servicePortNumber, negName := range negsToCreate { negStatus.NetworkEndpointGroups[servicePortNumber] = negName } for servicePortNumber := range negsToDelete { delete(negStatus.NetworkEndpointGroups, servicePortNumber) } if len(negStatus.NetworkEndpointGroups) == 0 { logger.V(2).Info("Removing hybrid NEG status annotation") delete(service.Annotations, annotation.HybridNEGStatusKey) return nil } negStatus.Zones = zones logger.V(2).Info("Updating hybrid NEG status annotation") negStatusBytes, err := json.Marshal(negStatus) if err != nil { return reconcile.TerminalError(fmt.Errorf("could not marshal new NEG status annotation value: %w", err)) } service.Annotations[annotation.HybridNEGStatusKey] = string(negStatusBytes) return nil } // handleDelete attempts to delete the hybrid NEGs of the Service that is being deleted if the // Service has the `solutions.cloud.google.com/hybrid-neg` finalizer. // // If any of the NEGs are referenced by backend services, deletion will fail. NEGs must be removed // from backend services before they can be deleted. Removing NEGs from backend services is // typically done using the same mechanism used to add NEG references to backend services, e.g., // using infrastructure-as-code tools. // // If deletion of any NEG fails, this method logs the error _but does not return the error_. // // After attempting to delete the NEGs, this method removes the // `solutions.cloud.google.com/hybrid-neg` finalizer from the Service. The method returns an error // that requeues the reconcile request if removal of the finalizer fails. // // [Backend services overview]: https://cloud.google.com/load-balancing/docs/backend-service func (r *serviceReconciler) handleDelete(ctx context.Context, logger logr.Logger, service *corev1.Service) error { if controllerutil.ContainsFinalizer(service, FinalizerName) { // Our finalizer is present, so let's clean up the external resources. if err := r.finalize(ctx, logger, service); err != nil { logger.Error(err, "Problem deleting external resources, these will need to be manually cleaned up") } // Remove our finalizer from the Service. controllerutil.RemoveFinalizer(service, FinalizerName) if err := r.Update(ctx, service); err != nil { logger.Error(err, "Problem removing finalizer, requeueing") return err } } return nil } // finalize parses the hybrid NEG status annotation on the Service and uses the NEG names and // zones in the annoation value to request deletion of the hybrid NEGs associated to the Service. // // This method returns an error if either of the following happens: // - There is a problem parsing the hybrid NEG status annotation value. // - There is problem deleting one or more of the hybrid NEGs. func (r *serviceReconciler) finalize(ctx context.Context, logger logr.Logger, service *corev1.Service) error { negStatus, exists, err := getHybridNEGStatus(service.Annotations) if err != nil { return fmt.Errorf("problem reading the hybrid NEG status annotation: %w", err) } if !exists { return nil } g, groupCtx := errgroup.WithContext(ctx) for _, negName := range negStatus.NetworkEndpointGroups { g.Go(func() error { return r.negClient.DeleteNEGs(groupCtx, logger, negName, types.NamespacedName{ Namespace: service.GetNamespace(), Name: service.GetName(), }) }) } return g.Wait() }