controllers/autoneg.go (475 lines of code) (raw):

/* Copyright 2019-2023 Google LLC. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "encoding/json" "errors" "fmt" "sort" "time" backoff "github.com/cenkalti/backoff/v5" "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" ) const ( oldAutonegAnnotation = "anthos.cft.dev/autoneg" autonegAnnotation = "controller.autoneg.dev/neg" oldAutonegStatusAnnotation = "anthos.cft.dev/autoneg-status" autonegStatusAnnotation = "controller.autoneg.dev/neg-status" negStatusAnnotation = "cloud.google.com/neg-status" negAnnotation = "cloud.google.com/neg" oldAutonegFinalizer = "anthos.cft.dev/autoneg" autonegFinalizer = "controller.autoneg.dev/neg" autonegSyncAnnotation = "controller.autoneg.dev/sync" computeOperationStatusDone = "DONE" computeOperationStatusRunning = "RUNNING" computeOperationStatusPending = "PENDING" maxElapsedTime = 4 * time.Minute ) var ( errConfigInvalid = errors.New("autoneg configuration invalid") errJSONInvalid = errors.New("json malformed") ) type errNotFound struct { Name string } func (e *errNotFound) Error() string { return fmt.Sprintf("backend service not found") } // Backend returns a compute.Backend struct specified with a backend group // and the embedded AutonegConfig func (s AutonegStatus) Backend(name string, port string, group string) compute.Backend { cfg := s.AutonegConfig.BackendServices[port][name] // Extract initial_capacity setting, if set var capacityScaler float64 = 1 if capacity := cfg.InitialCapacity; capacity != nil { // This case should not be possible since validateNewConfig checks // it, but leave the default setting of 100% if capacity is less // than 0 or greater than 100 if *capacity >= int32(0) && *capacity <= int32(100) { capacityScaler = float64(*capacity) / 100 } } if capacity := cfg.CapacityScaler; capacity != nil { // This case should not be possible since validateNewConfig checks // it, but leave the default setting of 100% if capacity is less // than 0 or greater than 100 if *capacity >= int32(0) && *capacity <= int32(100) { capacityScaler = float64(*capacity) / 100 } } // Prefer the rate balancing mode if set if cfg.Rate > 0 { return compute.Backend{ Group: group, BalancingMode: "RATE", MaxRatePerEndpoint: cfg.Rate, CapacityScaler: capacityScaler, } } else { return compute.Backend{ Group: group, BalancingMode: "CONNECTION", MaxConnectionsPerEndpoint: int64(cfg.Connections), CapacityScaler: capacityScaler, } } } // NewBackendController takes the project name and an initialized *compute.Service func NewBackendController(project string, s *compute.Service) *ProdBackendController { return &ProdBackendController{ project: project, s: s, } } func (b *ProdBackendController) getBackendService(name string, region string) (svc *compute.BackendService, err error) { if region == "" { svc, err = compute.NewBackendServicesService(b.s).Get(b.project, name).Do() if e, ok := err.(*googleapi.Error); ok { if e.Code == 404 { err = &errNotFound{Name: name} } } } else { svc, err = compute.NewRegionBackendServicesService(b.s).Get(b.project, region, name).Do() if e, ok := err.(*googleapi.Error); ok { if e.Code == 404 { err = &errNotFound{Name: name} } } } return } func (b *ProdBackendController) updateBackends(name string, region string, svc *compute.BackendService, forceCapacity map[int]bool) error { if len(svc.Backends) == 0 { svc.NullFields = []string{"Backends"} } else { for beidx, be := range svc.Backends { if fc, ok := forceCapacity[beidx]; ok && fc { be.ForceSendFields = []string{"CapacityScaler"} } } } // Perform locking to ensure we patch the intended object version if region == "" { p := compute.NewBackendServicesService(b.s).Patch(b.project, name, svc) p.Header().Set("If-match", svc.Header.Get("ETag")) res, err := p.Do() if err != nil { return err } operation := func() (bool, error) { op, err := compute.NewGlobalOperationsService(b.s).Get(b.project, res.Name).Do() if err != nil { return false, err } return true, checkOperation(op) } _, err = backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()), backoff.WithMaxElapsedTime(maxElapsedTime)) return err } else { p := compute.NewRegionBackendServicesService(b.s).Patch(b.project, region, name, svc) p.Header().Set("If-match", svc.Header.Get("ETag")) res, err := p.Do() if err != nil { return err } operation := func() (bool, error) { op, err := compute.NewRegionOperationsService(b.s).Get(b.project, region, res.Name).Do() if err != nil { return false, err } return true, checkOperation(op) } _, err = backoff.Retry(context.TODO(), operation, backoff.WithBackOff(backoff.NewExponentialBackOff()), backoff.WithMaxElapsedTime(maxElapsedTime)) return err } } func checkOperation(op *compute.Operation) error { switch op.Status { case computeOperationStatusPending: return errors.New("operation pending") case computeOperationStatusRunning: return errors.New("operation running") case computeOperationStatusDone: if op.Error != nil { // patch operation failed return fmt.Errorf("operation %d failed", op.Id) } return nil } return fmt.Errorf("unknown operation state: %s", op.Status) } // ReconcileBackends takes the actual and intended AutonegStatus // and attempts to apply the intended status or return an error func (b *ProdBackendController) ReconcileBackends(actual, intended AutonegStatus) (err error) { removes, upserts := ReconcileStatus(b.project, actual, intended) var forceCapacity map[int]bool = make(map[int]bool, 0) for port, _removes := range removes { for idx, remove := range _removes { var oldSvc *compute.BackendService oldSvc, err = b.getBackendService(remove.name, remove.region) var svcUpdated = false var e *errNotFound if errors.As(err, &e) { // If the backend service is gone, we construct a BackendService with the same name // and an empty list of backends. err = nil oldSvc = &compute.BackendService{ Name: remove.name, Backends: make([]*compute.Backend, 0), } } else if err != nil { return } var newSvc *compute.BackendService upsert := upserts[port][idx] if upsert.name != remove.name { if newSvc, err = b.getBackendService(upsert.name, upsert.region); err != nil { return } } else { newSvc = oldSvc } // Remove backends in the list to be deleted for _, d := range remove.backends { for i, be := range oldSvc.Backends { if d.Group == be.Group { svcUpdated = true copy(oldSvc.Backends[i:], oldSvc.Backends[i+1:]) oldSvc.Backends = oldSvc.Backends[:len(oldSvc.Backends)-1] break } } } // If we are changing backend services, save the old service if upsert.name != remove.name && svcUpdated { if err = b.updateBackends(remove.name, remove.region, oldSvc, forceCapacity); err != nil { return } } // Add or update any new backends to the list for _, u := range upsert.backends { copy := true for beidx, be := range newSvc.Backends { if u.Group == be.Group { // TODO: copy fields explicitly be.MaxRatePerEndpoint = u.MaxRatePerEndpoint be.MaxConnectionsPerEndpoint = u.MaxConnectionsPerEndpoint if intended.AutonegSyncConfig != nil { var syncConfig AutonegSyncConfig = *intended.AutonegSyncConfig if syncConfig.CapacityScaler != nil && *syncConfig.CapacityScaler == true { be.CapacityScaler = u.CapacityScaler forceCapacity[beidx] = true } } else { // Force CapacityScaler to an "empty value" u.CapacityScaler = 0 // Check if existing capacity scaler is zero if be.CapacityScaler == 0 { forceCapacity[beidx] = true } } copy = false break } } if copy { // It's a new backend to be added newBackend := u if _, ok := intended.AutonegConfig.BackendServices[port][idx]; ok { if intended.AutonegConfig.BackendServices[port][idx].InitialCapacity != nil { forceCapacity[len(newSvc.Backends)] = true } } newSvc.Backends = append(newSvc.Backends, &newBackend) } } if len(upsert.backends) > 0 { err = b.updateBackends(upsert.name, upsert.region, newSvc, forceCapacity) } if err != nil { return err } } } return nil } // for sorting the backends to keep tests happy func sortBackends(backends *[]compute.Backend) { sort.SliceStable(*backends, func(i, j int) bool { return (*backends)[i].Group < (*backends)[j].Group }) } // ReconcileStatus takes the actual and intended AutonegStatus // and returns sets of backends to remove, and to upsert func ReconcileStatus(project string, actual AutonegStatus, intended AutonegStatus) (removes, upserts map[string]map[string]Backends) { upserts = make(map[string]map[string]Backends, 0) removes = make(map[string]map[string]Backends, 0) // transform into maps with backend group as key actualBE := map[string]map[string]struct{}{} for port, neg := range actual.NEGs { actualBE[port] = map[string]struct{}{} for _, zone := range actual.Zones { group := getGroup(project, zone, neg) actualBE[port][group] = struct{}{} } } intendedBE := map[string]map[string]struct{}{} for port, neg := range intended.NEGs { intendedBE[port] = map[string]struct{}{} for _, zone := range intended.Zones { group := getGroup(project, zone, neg) intendedBE[port][group] = struct{}{} } } // actualBE and intendedBE is a list of NEGs per port now var intendedBEKeys []string for k := range intendedBE { intendedBEKeys = append(intendedBEKeys, k) } sort.Strings(intendedBEKeys) for _, port := range intendedBEKeys { upserts[port] = make(map[string]Backends, len(intendedBE)) removes[port] = make(map[string]Backends, len(intendedBE)) groups := intendedBE[port] for bname, be := range intended.BackendServices[port] { upsert := Backends{name: be.Name, region: be.Region} var groupsKeys []string for k := range groups { groupsKeys = append(groupsKeys, k) } sort.Strings(groupsKeys) for _, i := range groupsKeys { be := intended.Backend(bname, port, i) upsert.backends = append(upsert.backends, be) } sortBackends(&upsert.backends) upserts[port][bname] = upsert remove := Backends{name: be.Name, region: be.Region} // test to see if we are changing backend services if _, ok := actual.BackendServices[port][bname]; ok { if actual.BackendServices[port][bname].Name == be.Name || actual.BackendServices[port][bname].Name == "" { // find backends to be deleted for a := range actualBE[port] { if _, ok := intendedBE[port][a]; !ok { rbe := actual.Backend(bname, port, a) remove.backends = append(remove.backends, rbe) } } sortBackends(&remove.backends) removes[port][bname] = remove } else { // moving to a different backend service means removing all actual backends remove.name = actual.BackendServices[port][bname].Name remove.region = actual.BackendServices[port][bname].Region for a := range actualBE[port] { rbe := actual.Backend(bname, port, a) remove.backends = append(remove.backends, rbe) } sortBackends(&remove.backends) removes[port][bname] = remove } } else { // add empty remove if adding to a mint backend service remove.name = intended.BackendServices[port][bname].Name remove.region = intended.BackendServices[port][bname].Region removes[port][bname] = remove } } // see if there are removed backend services for aname := range actual.BackendServices[port] { if _, ok := intended.BackendServices[port][aname]; !ok { be := actual.BackendServices[port][aname] remove := Backends{name: be.Name, region: be.Region} remove.name = actual.BackendServices[port][aname].Name remove.region = actual.BackendServices[port][aname].Region for a := range actualBE[port] { rbe := actual.Backend(aname, port, a) remove.backends = append(remove.backends, rbe) } sortBackends(&remove.backends) removes[port][aname] = remove upsert := Backends{name: be.Name, region: be.Region} upserts[port][aname] = upsert } } } // see if some configs were removed entirely for port, _ := range actual.BackendServices { if _, ok := intended.BackendServices[port]; !ok { if _, ok = removes[port]; !ok { removes[port] = make(map[string]Backends, len(actualBE)) } for aname, be := range actual.BackendServices[port] { remove := Backends{name: be.Name, region: be.Region} remove.name = actual.BackendServices[port][aname].Name remove.region = actual.BackendServices[port][aname].Region for a := range actualBE[port] { rbe := actual.Backend(aname, port, a) remove.backends = append(remove.backends, rbe) } sortBackends(&remove.backends) removes[port][aname] = remove } } } return } func getGroup(project, zone, neg string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/networkEndpointGroups/%s", project, zone, neg) } func validateOldConfig(cfg OldAutonegConfig) error { // do additional validation return nil } func validateNewConfig(config AutonegConfig) error { for _, cfgs := range config.BackendServices { for _, cfg := range cfgs { if cfg.InitialCapacity != nil { if *cfg.InitialCapacity < 0 || *cfg.InitialCapacity > 100 { return fmt.Errorf("initial_capacity for backend %q must be between 0 and 100 inclusive, but was %q; see https://cloud.google.com/load-balancing/docs/backend-service#capacity_scaler for details", cfg.Name, *cfg.InitialCapacity) } } } } for _, cfgs := range config.BackendServices { for _, cfg := range cfgs { if cfg.CapacityScaler != nil { if *cfg.CapacityScaler < 0 || *cfg.CapacityScaler > 100 { return fmt.Errorf("capacity_scaler for backend %q must be between 0 and 100 inclusive, but was %q; see https://cloud.google.com/load-balancing/docs/backend-service#capacity_scaler for details", cfg.Name, *cfg.CapacityScaler) } } } } return nil } func getStatuses(namespace string, name string, annotations map[string]string, r *ServiceReconciler) (s Statuses, valid bool, err error) { // Read the current cloud.google.com/neg annotation tmp, ok := annotations[negAnnotation] if ok { // Found a status, decode if err = json.Unmarshal([]byte(tmp), &s.negConfig); err != nil { return } } // Is this service using autoneg in new mode? oldOk := false tmp, newOk := annotations[autonegAnnotation] if newOk { valid = true var tempConfig AutonegConfigTemp if err = json.Unmarshal([]byte(tmp), &tempConfig); err != nil { return } tmpSync, syncOk := annotations[autonegSyncAnnotation] if syncOk { if err = json.Unmarshal([]byte(tmpSync), &s.syncConfig); err != nil { return } } s.config.BackendServices = make(map[string]map[string]AutonegNEGConfig, len(tempConfig.BackendServices)) for port, cfgs := range tempConfig.BackendServices { s.config.BackendServices[port] = make(map[string]AutonegNEGConfig, len(cfgs)) for _, cfg := range cfgs { if cfg.Name == "" || !r.AllowServiceName { // Default to name generated using serviceNameTemplate cfg.Name = generateServiceName(namespace, name, port, r.ServiceNameTemplate) } //Use defaults if rate and connections have not been set if cfg.Rate == 0 && cfg.Connections == 0 { if r.MaxRatePerEndpointDefault > 0 { cfg.Rate = r.MaxRatePerEndpointDefault } else { cfg.Connections = r.MaxConnectionsPerEndpointDefault } } s.config.BackendServices[port][cfg.Name] = cfg } } // Is this autoneg config valid? if err = validateNewConfig(s.config); err != nil { return } s.newConfig = true } // Does service has new auto neg status? tmp, statusOk := annotations[autonegStatusAnnotation] if statusOk { // Status annotation found but auto neg annotation not found set empty auto neg config if !newOk && r.DeregisterNEGsOnAnnotationRemoval { s.config = AutonegConfig{} valid = true s.newConfig = true } // Found a status, decode if err = json.Unmarshal([]byte(tmp), &s.status); err != nil { return } } if !newOk && !statusOk { // Is this service using autoneg in legacy mode? tmp, oldOk = annotations[oldAutonegAnnotation] if oldOk { valid = true if err = json.Unmarshal([]byte(tmp), &s.oldConfig); err != nil { return } // Default to the k8s service name if s.oldConfig.Name == "" { s.oldConfig.Name = name } // Is this autoneg config valid? if err = validateOldConfig(s.oldConfig); err != nil { return } // Convert the old configuration to a new style configuration s.config.BackendServices = make(map[string]map[string]AutonegNEGConfig, 1) if len(s.negConfig.ExposedPorts) == 1 { var firstPort string for k, _ := range s.negConfig.ExposedPorts { firstPort = k break } s.config.BackendServices[firstPort] = make(map[string]AutonegNEGConfig, 1) s.config.BackendServices[firstPort][s.oldConfig.Name] = AutonegNEGConfig{ Name: s.oldConfig.Name, Rate: s.oldConfig.Rate, Connections: 0, } } else { err = errors.New(fmt.Sprintf("more than one port in %s, but autoneg configuration is for one or no ports", negAnnotation)) return } } tmp, ok = annotations[oldAutonegStatusAnnotation] if ok { // Status annotation found but auto neg annotation not found set empty auto neg config if !oldOk && r.DeregisterNEGsOnAnnotationRemoval { s.oldConfig = OldAutonegConfig{} valid = true } // Found a status, decode if err = json.Unmarshal([]byte(tmp), &s.oldStatus); err != nil { return } } } tmp, ok = annotations[negStatusAnnotation] if ok { // Found a status, decode if err = json.Unmarshal([]byte(tmp), &s.negStatus); err != nil { return } } return }