projects/k8s-hybrid-neg-controller/pkg/reconciler/endpointslice.go (264 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package reconciler
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"cloud.google.com/go/compute/apiv1/computepb"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"github.com/googlecloudplatform/k8s-hybrid-neg-controller/pkg/annotation"
"github.com/googlecloudplatform/k8s-hybrid-neg-controller/pkg/neg"
)
const (
// DurationObtainServiceLock determines how long the controller waits to obtain the lock for
// updating endpoints of NEGs that belong to a Kubernetes Service.
DurationObtainServiceLock = 5 * time.Minute
)
var (
errEmptyNEGNameInStatus = errors.New("empty NEG name in hybrid NEG status annotation")
errNoServiceNameLabel = errors.New("no Service name label on EndpointSlice")
errNoStatusAnnotation = errors.New("hybrid NEG config annotation present, but not the status annotation")
)
// endpointSliceReconciler implements `reconcile.Reconciler` for Kubernetes EndpointSlice objects.
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime@main/pkg/reconcile
type endpointSliceReconciler struct {
client.Client
defaultNEGZone string
negClient *neg.Client
recorder record.EventRecorder
requeueAfter time.Duration
timeoutSyncServiceNEGs time.Duration
zoneMapping map[string]string
zones []string
}
var _ reconcile.Reconciler = &endpointSliceReconciler{}
func NewEndpointSliceReconciler(k8sClient client.Client, recorder record.EventRecorder, negClient *neg.Client, zones []string, zoneMapping map[string]string, defaultNEGZone string, requeueAfter time.Duration, timeoutSyncServiceNEGs time.Duration) reconcile.Reconciler {
return &endpointSliceReconciler{
Client: k8sClient,
recorder: recorder,
negClient: negClient,
zones: zones,
zoneMapping: zoneMapping,
defaultNEGZone: defaultNEGZone,
requeueAfter: requeueAfter,
timeoutSyncServiceNEGs: timeoutSyncServiceNEGs,
}
}
// Reconcile performs a full reconciliation for the Kubernetes EndpointSlice object referred to by
// the `reconcile.Request`. The controller will requeue the `reconcile.Request` to be processed
// again if an error is non-nil or `reconcile.Result.Requeue` is `true`, otherwise upon completion
// it will remove the work from the queue.
func (r *endpointSliceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logger := log.FromContext(ctx)
logger.V(2).Info("Reconciling EndpointSlice")
err := r.reconcile(ctx, logger, req.Namespace, req.Name)
if err == nil || errors.Is(err, reconcile.TerminalError(nil)) {
return reconcile.Result{}, err
}
logger.Error(err, "Requeueing")
return reconcile.Result{
Requeue: true,
RequeueAfter: r.requeueAfter,
}, nil
}
// reconcile looks up the Service for the provided EndpointSlice name and namespace, and then
// performs a reconciliation for all NEGs of that Service.
func (r *endpointSliceReconciler) reconcile(ctx context.Context, logger logr.Logger, namespace string, endpointSliceName string) error {
service, err := r.getService(ctx, namespace, endpointSliceName)
if err != nil {
logger.Error(err, "Could not find Service for EndpointSlice")
return client.IgnoreNotFound(err)
}
logger = logger.WithValues("service", service.GetName())
return r.reconcileEndpointsForService(ctx, logger, service)
}
func (r *endpointSliceReconciler) reconcileEndpointsForService(ctx context.Context, logger logr.Logger, service *corev1.Service) error {
negStatus, err := getNEGStatus(logger, service)
if err != nil {
logger.Error(err, "Could not get Service NEG status")
return err
}
if negStatus == nil {
logger.V(6).Info("No Service NEG status, skipping")
return nil
}
logger.V(4).Info("Found Service NEG status", "negStatus", negStatus)
logger.V(2).Info("Reconciling endpoints")
servicePortNameToNEGNameMap, err := getServicePortNameToNEGNameMap(*negStatus, service.Spec.Ports)
if err != nil {
logger.Error(err, "Skipping reconcile due to invalid NEG config, could not get service port to NEG name map.")
return reconcile.TerminalError(err)
}
endpointSliceList, err := r.getEndpointSlicesForService(ctx, service)
if err != nil {
logger.Error(err, "Could not get EndpointSlices for Service")
return err
}
logger.V(4).Info("Number of EndpointSlices for Service", "endpointSliceListSize", len(endpointSliceList.Items))
servicePortNameToEndpointsByZoneMap := r.mapServicePortsToEndpointsByZone(logger, service.Spec.Ports, negStatus.Zones, endpointSliceList)
return r.syncEndpoints(ctx, logger, servicePortNameToEndpointsByZoneMap, servicePortNameToNEGNameMap, types.NamespacedName{
Namespace: service.GetNamespace(),
Name: service.GetName(),
})
}
// getNEGStatus returns the hybrid NEG status annotation value for the provided Service.
func getNEGStatus(logger logr.Logger, service *corev1.Service) (*annotation.NEGStatus, error) {
hybridNEGConfigAnnotationValue, serviceHasHybridNEGConfigAnnotation := service.Annotations[annotation.HybridNEGConfigKey]
hybridNEGStatusAnnotationValue, serviceHasHybridNEGStatusAnnotation := service.Annotations[annotation.HybridNEGStatusKey]
if !serviceHasHybridNEGConfigAnnotation && !serviceHasHybridNEGStatusAnnotation {
logger.V(4).Info("Service does not have the hybrid NEG config or status annotations, skipping reconcile")
return nil, nil
}
negConfig := &annotation.NEGConfig{}
if serviceHasHybridNEGConfigAnnotation {
if err := json.Unmarshal([]byte(hybridNEGConfigAnnotationValue), negConfig); err != nil {
return nil, reconcile.TerminalError(fmt.Errorf("could not unmarshall NEG config annotation value [%s]:%w", hybridNEGConfigAnnotationValue, err))
}
}
if serviceHasHybridNEGConfigAnnotation && len(negConfig.ExposedPorts) > 0 && !serviceHasHybridNEGStatusAnnotation {
// The status annotation may not have been created yet, requeue to try again.
return nil, fmt.Errorf("%w, requeueing", errNoStatusAnnotation)
}
negStatus := &annotation.NEGStatus{}
if err := json.Unmarshal([]byte(hybridNEGStatusAnnotationValue), negStatus); err != nil {
return nil, reconcile.TerminalError(fmt.Errorf("could not unmarshall NEG status annotation value [%s]:%w", hybridNEGStatusAnnotationValue, err))
}
return negStatus, nil
}
// getEndpointSlicesForService returns all the EndpointSlices for a Service.
func (r *endpointSliceReconciler) getEndpointSlicesForService(ctx context.Context, service *corev1.Service) (discoveryv1.EndpointSliceList, error) {
labelSelector, err := labels.Parse(fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, service.GetName()))
if err != nil {
return discoveryv1.EndpointSliceList{}, reconcile.TerminalError(fmt.Errorf("could not create label selector: %w", err))
}
endpointSliceList := discoveryv1.EndpointSliceList{}
if err := r.List(ctx, &endpointSliceList, &client.ListOptions{
LabelSelector: labelSelector,
Namespace: service.GetNamespace(),
}); err != nil {
return discoveryv1.EndpointSliceList{}, fmt.Errorf("could not list EndpointSlices of service=%s/%s, requeueing: %w", service.GetNamespace(), service.GetName(), err)
}
return endpointSliceList, nil
}
// getService looks up and returns the Service for the provided EndpointSlice name and namespace.
func (r *endpointSliceReconciler) getService(ctx context.Context, namespace string, endpointSliceName string) (*corev1.Service, error) {
endpointSlice := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
APIVersion: "discovery.k8s.io/v1",
Kind: "EndpointSlice",
},
}
if err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: endpointSliceName,
}, endpointSlice); err != nil {
return nil, err
}
serviceName, exists := endpointSlice.ObjectMeta.Labels[discoveryv1.LabelServiceName]
if !exists {
return nil, errNoServiceNameLabel
}
service := &corev1.Service{}
if err := r.Get(ctx, types.NamespacedName{
Namespace: namespace,
Name: serviceName,
}, service); err != nil {
return nil, err
}
return service, nil
}
// syncEndpoints syncs the endpoints of all NEGs of the provided Service.
func (r *endpointSliceReconciler) syncEndpoints(ctx context.Context, logger logr.Logger, servicePortNameToEndpointsByZoneMap neg.ServiceEndpoints, servicePortNameToNEGNameMap map[string]string, service types.NamespacedName) error {
logger.V(4).Info("Syncing endpoints", "servicePortNameToNEGNameMap", servicePortNameToNEGNameMap)
logger.V(4).Info("Syncing endpoints", "len(servicePortNameToEndpointsByZoneMap)", len(servicePortNameToEndpointsByZoneMap))
for servicePortName, endpointsByZoneMap := range servicePortNameToEndpointsByZoneMap {
logger.V(4).Info("Syncing endpoints", "servicePortName", servicePortName, "len(endpointsByZoneMap)", len(endpointsByZoneMap))
for zone, endpoints := range endpointsByZoneMap {
logger.V(4).Info("Syncing endpoints", "servicePortName", servicePortName, "zone", zone, "len(endpoints)", len(endpoints))
}
}
g, ctx := errgroup.WithContext(ctx)
for portName, zoneToEndpointMap := range servicePortNameToEndpointsByZoneMap {
g.Go(func() error {
negName, exists := servicePortNameToNEGNameMap[portName]
if !exists {
logger.V(2).Info("NEG name mapping not found, skipping", "portName", portName)
return nil
}
logger.V(4).Info("Syncing endpoints for NEG", "negName", negName)
if err := r.negClient.SyncEndpoints(ctx, logger, negName, zoneToEndpointMap, service); err != nil {
return fmt.Errorf("problem syncing NEG name=%s: %w", negName, err)
}
logger.V(4).Info("Successfully synced endpoints for NEG", "negName", negName)
return nil
})
}
if err := g.Wait(); err != nil {
return reconcile.TerminalError(fmt.Errorf("problem syncing one or more NEGs: %w", err))
}
logger.V(4).Info("Successfully synced endpoints", "servicePortNameToNEGNameMap", servicePortNameToNEGNameMap)
return nil
}
// mapServicePortsToEndpointsByZone returns a map of servicePortName -> zone -> endpoint,
// where endpoint is an (IP address, port number) tuple.
//
// The method iterates over the list of EndpointSlices. Each EndpointSlice may contain multiple
// ports. These are identified by the name of the Service port.
//
// For headful Services, the target port may be a name or a port number. If it is a name, the
// actual endpoint port may differ from one Pod to another, based on the Pod spec.
// Also, endpoints in an EndpointSlice may come from different zones.
func (r *endpointSliceReconciler) mapServicePortsToEndpointsByZone(logger logr.Logger, servicePorts []corev1.ServicePort, zones []string, endpointSliceList discoveryv1.EndpointSliceList) neg.ServiceEndpoints {
servicePortsToEndpointsByZone := initializeServicePortNameToEndpointsByZoneMap(servicePorts, zones)
for _, endpointSlice := range endpointSliceList.Items {
// The endpoint port may be different to the service port for headful Services.
for _, endpointSlicePort := range endpointSlice.Ports {
if endpointSlicePort.Port == nil || *endpointSlicePort.Port == 0 {
logger.V(2).Info("Skipping EndpointSlice port without Port value", "endpointSlice", endpointSlice)
continue
}
var servicePortName string // empty string is a valid port name if there's only one service port
if endpointSlicePort.Name != nil {
servicePortName = *endpointSlicePort.Name
}
logger.V(4).Info("Number of endpoints for port", "endpointsSize", len(endpointSlice.Endpoints), "port", *endpointSlicePort.Port, "portName", servicePortName)
for _, endpoint := range endpointSlice.Endpoints {
if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready {
continue
}
zone := r.mapZone(logger, endpoint)
for _, address := range endpoint.Addresses {
servicePortsToEndpointsByZone[servicePortName][zone].Put(&computepb.NetworkEndpoint{
IpAddress: proto.String(address),
Port: endpointSlicePort.Port,
})
}
}
}
}
return servicePortsToEndpointsByZone
}
func initializeServicePortNameToEndpointsByZoneMap(servicePorts []corev1.ServicePort, zones []string) neg.ServiceEndpoints {
servicePortsToEndpointsByZone := neg.ServiceEndpoints{}
for _, servicePort := range servicePorts {
servicePortsToEndpointsByZone[servicePort.Name] = neg.ZonalEndpoints{}
for _, zone := range zones {
servicePortsToEndpointsByZone[servicePort.Name][zone] = neg.EndpointSet{}
}
}
return servicePortsToEndpointsByZone
}
// getServicePortNameToNEGNameMap translates the service ports from the network_endpoint_groups map in
// the hybrid NEG status annotation to service port names.
func getServicePortNameToNEGNameMap(negStatus annotation.NEGStatus, servicePorts []corev1.ServicePort) (map[string]string, error) {
servicePortNameToNEGNameMap := make(map[string]string, len(negStatus.NetworkEndpointGroups))
for servicePortNumber, negName := range negStatus.NetworkEndpointGroups {
if len(negName) == 0 {
return nil, fmt.Errorf("%w for service port number %s: %+v", errEmptyNEGNameInStatus, servicePortNumber, negStatus.NetworkEndpointGroups)
}
for _, servicePort := range servicePorts {
if strconv.FormatInt(int64(servicePort.Port), 10) == servicePortNumber {
servicePortNameToNEGNameMap[servicePort.Name] = negName
}
}
}
return servicePortNameToNEGNameMap, nil
}
// mapZone returns the Compute Engine zone for the provided endpoint, based on the endpoint's
// zone, and the controller's zone mapping. If the endpoint's zone is nil, or if there is no
// match for the endpoint's zone in the controller's zone mapping configuration, this method
// returns the controller's default Compute Engine zone.
func (r *endpointSliceReconciler) mapZone(logger logr.Logger, endpoint discoveryv1.Endpoint) string {
if endpoint.Zone == nil {
logger.Info("Nil zone for endpoint in EndpointSlice, using default NEG zone", "endpoint", endpoint, "defaultNEGZone", r.defaultNEGZone)
return r.defaultNEGZone
}
zone, ok := r.zoneMapping[*endpoint.Zone]
if !ok {
logger.Info("No mapping for endpoint zone, using default NEG zone", "endpointZone", *endpoint.Zone, "defaultNEGZone", r.defaultNEGZone)
return r.defaultNEGZone
}
return zone
}