projects/k8s-hybrid-neg-controller/pkg/neg/sync.go (207 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package neg import ( "context" "errors" "fmt" "net/http" "sync" "time" compute "cloud.google.com/go/compute/apiv1" "cloud.google.com/go/compute/apiv1/computepb" "github.com/go-logr/logr" "github.com/gogo/protobuf/proto" "github.com/google/uuid" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/api/googleapi" "k8s.io/apimachinery/pkg/types" ) const ( // MaxSizeNetworkEndpointsAttachDetach is the max number of network endpoints that can // be attached or detached from a NEG in a single API call. // It is also the max number of network endpoints returned per page in API calls to list // existing endpoints of NEGs. MaxSizeNetworkEndpointsAttachDetach = 500 ) var ( // zonalNEGLocks tracks locks that prevent multiple goroutines simultaneously updating a zonal NEG. zonalNEGLocks sync.Map errUnexpectedLockType = errors.New("unexpected type of lock") ) // SyncEndpoints determines the diff between current network endpoints in zonal NEGs, and the new set of // endpoints supplied. The function then attaches and detaches network endpoints so that the list // of network endpoints for each zone matches the supplied set of endpoints. func (c *Client) SyncEndpoints(ctx context.Context, logger logr.Logger, name string, endpointsByZone ZonalEndpoints, service types.NamespacedName) error { logger = logger.WithValues("GoogleCloudProjectID", c.projectID, "NetworkEndpointGroup", name) logger.Info("Syncing NEGs", "type", negTypeHybrid, "zones", c.zones) g, groupCtx := errgroup.WithContext(ctx) for _, zone := range c.zones { g.Go(func() error { return c.syncZonalNEG(groupCtx, logger, name, zone, endpointsByZone[zone], service) }) } if err := g.Wait(); err != nil { return fmt.Errorf("problem syncing NEG name=%s for Kubernetes Service=%s: %w", name, service.String(), err) } logger.Info("Synced NEGs", "type", negTypeHybrid, "zones", c.zones) return nil } func (c *Client) syncZonalNEG(ctx context.Context, logger logr.Logger, name string, zone string, newEndpoints EndpointSet, service types.NamespacedName) error { logger = logger.WithValues("zone", zone) // Using semaphores instead of mutex because Acquire() takes a context argument, enabling deadlines on obtaining locks. // sync.Mutex could be used instead, but the code gets more complicated. lockName := fmt.Sprintf("%s/%s", name, zone) logger = logger.WithValues("lock", lockName) l, _ := zonalNEGLocks.LoadOrStore(lockName, semaphore.NewWeighted(1)) sem, ok := l.(*semaphore.Weighted) if !ok { return fmt.Errorf("%w: expected *semaphore.Weighted, got %T for zonal NEG name=%s zone=%s", errUnexpectedLockType, l, name, zone) } lockAcquireCtx, cancelLockAquire := context.WithDeadline(ctx, time.Now().Add(c.timeouts.SyncZonalNEG)) defer cancelLockAquire() if err := sem.Acquire(lockAcquireCtx, 1); err != nil { return fmt.Errorf("could not obtain lock for updating NEG name=%s zone=%s: %w", name, zone, err) } defer func() { sem.Release(1) logger.V(4).Info("Released lock for modifying zonal NEG") }() logger.V(4).Info("Obtained lock for modifying zonal NEG") syncCtx, cancelSync := context.WithDeadline(ctx, time.Now().Add(c.timeouts.SyncZonalNEG)) defer cancelSync() logger.V(6).Info("Listing endpoints") existingEndpoints, err := c.getEndpoints(syncCtx, name, zone) if err != nil && !isNotFound(err) { return fmt.Errorf("problem listing network endpoints of NEG with projectID=%s name=%s zone=%s: %w", c.projectID, name, zone, err) } if isNotFound(err) { logger.Error(err, "Zonal NEG not found, the initial creation may have failed, trying to create again.") neg := newNetworkEndpointGroup(name, c.projectID, c.network, service) if createErr := c.createZonalNEG(syncCtx, logger, zone, neg); createErr != nil { return fmt.Errorf("problem creating zonal NEG during endpoint sync name=%s zone=%s: %w", name, zone, createErr) } } attach, detach := diffEndpoints(existingEndpoints, newEndpoints) logger.V(4).Info("Network endpoint diff sizes", "attachSize", len(attach), "detachSize", len(detach)) return c.attachDetachEndpoints(syncCtx, logger, name, zone, attach, detach) } // getEndpoints returns the current endpoints of the NEG identified by the provided name and zone. // Pagination logic based on example from https://github.com/GoogleCloudPlatform/golang-samples/blob/main/compute/list_all_instances.go func (c *Client) getEndpoints(ctx context.Context, name string, zone string) (EndpointSet, error) { endpoints := EndpointSet{} request := &computepb.ListNetworkEndpointsNetworkEndpointGroupsRequest{ NetworkEndpointGroup: name, NetworkEndpointGroupsListEndpointsRequestResource: &computepb.NetworkEndpointGroupsListEndpointsRequest{ HealthStatus: proto.String(computepb.NetworkEndpointGroupsListEndpointsRequest_SKIP.String()), }, Project: c.projectID, ReturnPartialSuccess: proto.Bool(true), Zone: zone, } for endpoint, err := range c.client.ListNetworkEndpoints(ctx, request).All() { if err != nil { return nil, fmt.Errorf("problem iterating over network endpoints of NEG name=%s zone=%s: %w", name, zone, err) } if endpoint == nil { continue } // Instance is a pointer to an empty string in the response to ListNetworkEndpoints, // but must be nil in attach/detach requests for hybrid NEGs. So explicitly set it to `nil` here. // And set Fqdn to `nil` too, just in case. endpoint.NetworkEndpoint.Instance = nil endpoint.NetworkEndpoint.Fqdn = nil endpoints.Put(endpoint.NetworkEndpoint) } return endpoints, nil } // diffEndpoints returns a set of endpoints to be attached, and a set of endpoints to be // detached from a NEG. The diff is determined by comparing the current state and the desired // target state. func diffEndpoints(previous EndpointSet, current EndpointSet) ([]*computepb.NetworkEndpoint, []*computepb.NetworkEndpoint) { var added []*computepb.NetworkEndpoint var removed []*computepb.NetworkEndpoint for endpointKey, endpoint := range previous { if _, exists := current[endpointKey]; !exists { removed = append(removed, endpoint) } } for endpointKey, endpoint := range current { if _, exists := previous[endpointKey]; !exists { added = append(added, endpoint) } } return added, removed } // attachDetachEndpoints syncs the endpoints of the zonal NEG and blocks until all operations // return. The method returns an error if one or more of the attach/detach API requests fail. // The reason for blocking until done is to ensure that the next EndpointSlice reconciliation // request observes a consistent view of the endpoints in the NEGs. func (c *Client) attachDetachEndpoints(syncCtx context.Context, logger logr.Logger, name string, zone string, attach []*computepb.NetworkEndpoint, detach []*computepb.NetworkEndpoint) error { logger.V(8).Info("Network endpoint diff endpoints", "attach", attach, "detach", detach) attachOps, err := c.attachEndpoints(syncCtx, logger, name, zone, attach) if err != nil { return fmt.Errorf("problem attaching endpoints to NEG of type=%s for projectID=%s name=%s zone=%s attach=%+v: %w", negTypeHybrid, c.projectID, name, zone, attach, err) } detachOps, err := c.detachEndpoints(syncCtx, logger, name, zone, detach) if err != nil { return fmt.Errorf("problem detaching endpoints from NEG of type=%s for projectID=%s name=%s zone=%s detach=%+v: %w", negTypeHybrid, c.projectID, name, zone, detach, err) } g, groupCtx := errgroup.WithContext(syncCtx) for _, op := range append(attachOps, detachOps...) { g.Go(func() error { return op.Wait(groupCtx) }) } if err := g.Wait(); err != nil { return fmt.Errorf("problem while waiting for async attaching and detaching of endpoints for NEG name=%s zone=%s: %w", name, zone, err) } return nil } // attachEndpoints adds the provided endpoints to the NEG identified by the provided name and zone. // The method returns a slice of handles to the async attach API requests. func (c *Client) attachEndpoints(ctx context.Context, logger logr.Logger, name string, zone string, attach []*computepb.NetworkEndpoint) ([]*compute.Operation, error) { var ops []*compute.Operation // Requests to attach/detach network endpoints can have at most 500 endpoints. for i := 0; i < len(attach); i += MaxSizeNetworkEndpointsAttachDetach { var chunk []*computepb.NetworkEndpoint if i > len(attach)-MaxSizeNetworkEndpointsAttachDetach { chunk = attach[i:] } else { chunk = attach[i : i+MaxSizeNetworkEndpointsAttachDetach] } requestID := uuid.New().String() logger = logger.WithValues("chunkSize", len(chunk), "requestID", requestID) logger.V(6).Info("Sending API request to attach endpoints to zonal NEG") operation, err := c.client.AttachNetworkEndpoints(ctx, &computepb.AttachNetworkEndpointsNetworkEndpointGroupRequest{ NetworkEndpointGroup: name, NetworkEndpointGroupsAttachEndpointsRequestResource: &computepb.NetworkEndpointGroupsAttachEndpointsRequest{ NetworkEndpoints: chunk, }, Project: c.projectID, RequestId: proto.String(requestID), Zone: zone, }) if err != nil && !googleapi.IsNotModified(err) { return nil, err } ops = append(ops, operation) } return ops, nil } // detachEndpoints removes the provided endpoints from the NEG identified by the provided name and // zone. The method returns a slice of handles to the async attach API requests. func (c *Client) detachEndpoints(ctx context.Context, logger logr.Logger, name string, zone string, detach []*computepb.NetworkEndpoint) ([]*compute.Operation, error) { var ops []*compute.Operation // Requests to attach/detach network endpoints can have at most 500 endpoints. for i := 0; i < len(detach); i += MaxSizeNetworkEndpointsAttachDetach { var chunk []*computepb.NetworkEndpoint if i > len(detach)-MaxSizeNetworkEndpointsAttachDetach { chunk = detach[i:] } else { chunk = detach[i : i+MaxSizeNetworkEndpointsAttachDetach] } requestID := uuid.New().String() logger = logger.WithValues("chunkSize", len(chunk), "requestID", requestID) logger.V(6).Info("Sending API request to detach endpoints from zonal NEG") operation, err := c.client.DetachNetworkEndpoints(ctx, &computepb.DetachNetworkEndpointsNetworkEndpointGroupRequest{ NetworkEndpointGroup: name, NetworkEndpointGroupsDetachEndpointsRequestResource: &computepb.NetworkEndpointGroupsDetachEndpointsRequest{ NetworkEndpoints: chunk, }, Project: c.projectID, RequestId: proto.String(requestID), Zone: zone, }) if err != nil && !googleapi.IsNotModified(err) { return nil, err } ops = append(ops, operation) } return ops, nil } // isNotFound returns true if the provided error is a `googleapi.Error` with status code // 404 Not Found. func isNotFound(err error) bool { if err == nil { return false } var ae *googleapi.Error ok := errors.As(err, &ae) return ok && ae.Code == http.StatusNotFound } // endpointToString returns a string representation of the IPv4 address and port number of the // provided NetworkEndpoint. // Hybrid NEGs currently use only the IPv4 address and port number to identify endpoints; // instance name is not used, and IPv6 addresses are not currently supported. func endpointToString(endpoint *computepb.NetworkEndpoint) string { if endpoint == nil { return "nil" } return fmt.Sprintf("%s/%d", endpoint.GetIpAddress(), endpoint.GetPort()) }