pkg/controllers/member/endpointslice/controller.go (280 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
// Package endpointslice features the EndpointSlice controller for exporting an EndpointSlice from a member cluster
// to its fleet.
package endpointslice
import (
"context"
"fmt"
"strconv"
"time"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/metrics"
"go.goms.io/fleet-networking/pkg/common/objectmeta"
"go.goms.io/fleet-networking/pkg/common/uniquename"
)
// skipOrUnexportEndpointSliceOp describes the op the controller should take on an EndpointSlice, specifically
// whether to skip reconciling an EndpointSlice, and whether to unexport an EndpointSlice.
type skipOrUnexportEndpointSliceOp int
const (
// shouldSkipEndpointSliceOp notes that an EndpointSlice should be skipped for reconciliation.
shouldSkipEndpointSliceOp skipOrUnexportEndpointSliceOp = 0
// shouldUnexportEndpointSliceOp notes that an EndpointSlice should be unexported.
shouldUnexportEndpointSliceOp skipOrUnexportEndpointSliceOp = 1
// noSkipOrUnexportNeededOp notes that an EndpointSlice should not be skipped or unexported.
continueReconcileOp skipOrUnexportEndpointSliceOp = 2
)
// Reconciler reconciles the export of an EndpointSlice.
type Reconciler struct {
// The ID of the member cluster.
MemberClusterID string
MemberClient client.Client
HubClient client.Client
// The namespace reserved for the current member cluster in the hub cluster.
HubNamespace string
}
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;list;watch
// Reconcile exports an EndpointSlice.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
endpointSliceRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "endpointSlice", endpointSliceRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "endpointSlice", endpointSliceRef, "latency", latency)
}()
// Retrieve the EndpointSlice object.
var endpointSlice discoveryv1.EndpointSlice
if err := r.MemberClient.Get(ctx, req.NamespacedName, &endpointSlice); err != nil {
// Skip the reconciliation if the EndpointSlice does not exist; this should only happen when an EndpointSlice
// is deleted right before the controller gets a chance to reconcile it. If the EndpointSlice has never
// been exported to the fleet, no action is required on this controller's end; on the other hand, if the
// EndpointSlice has been exported before, this may result in an EndpointSlice being left over on the
// hub cluster, and it is up to another controller, EndpointSliceExport controller, to pick up the leftover
// and clean it out.
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound endpointSlice", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get endpoint slice", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
// Check if the EndpointSlice should be skipped for reconciliation or unexported.
skipOrUnexportOp, err := r.shouldSkipOrUnexportEndpointSlice(ctx, &endpointSlice)
if err != nil {
// An unexpected error occurs.
klog.ErrorS(err,
"Failed to determine whether an endpoint slice should be skipped for reconciliation or unexported",
"endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
switch skipOrUnexportOp {
case shouldSkipEndpointSliceOp:
// Skip reconciling the EndpointSlice.
klog.V(4).InfoS("Endpoint slice should be skipped for reconciliation", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, nil
case shouldUnexportEndpointSliceOp:
// Unexport the EndpointSlice.
klog.V(4).InfoS("Endpoint slice should be unexported", "endpointSlice", endpointSliceRef)
if err := r.unexportEndpointSlice(ctx, &endpointSlice); err != nil {
klog.ErrorS(err, "Failed to unexport the endpoint slice", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Retrieve the unique name assigned; if none has been assigned, or the one assigned is not valid, possibly due
// to user tampering with the annotation, assign a new unique name.
fleetUniqueName, ok := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
if !ok || !isUniqueNameValid(fleetUniqueName) {
klog.V(2).InfoS("The endpoint slice does not have a unique name assigned or the one assigned is not valid; a new one will be assigned",
"endpointSlice", endpointSliceRef)
var err error
// Unique name annotation must be added before an EndpointSlice is exported.
fleetUniqueName, err = r.assignUniqueNameAsAnnotation(ctx, &endpointSlice)
if err != nil {
klog.ErrorS(err, "Failed to assign unique name as an annotation", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
}
// Retrieve the last seen generation and the last seen timestamp; these two values are used for metric collection.
// If the two values are not present or not valid, annotate EndpointSlice with new values.
//
// Note that the two values are not tamperproof.
exportedSince, err := r.collectAndVerifyLastSeenGenerationAndTimestamp(ctx, &endpointSlice, startTime)
if err != nil {
klog.Warning("Failed to annotate last seen generation and timestamp", "endpointSlice", endpointSliceRef)
}
// Create an EndpointSliceExport in the hub cluster if the EndpointSlice has never been exported; otherwise
// update the corresponding EndpointSliceExport.
extractedEndpoints := extractEndpointsFromEndpointSlice(&endpointSlice)
endpointSliceExport := fleetnetv1alpha1.EndpointSliceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.HubNamespace,
Name: fleetUniqueName,
},
}
klog.V(2).InfoS("Endpoint slice will be exported",
"endpointSlice", endpointSliceRef,
"endpointSliceExport", klog.KObj(&endpointSliceExport))
createOrUpdateOp, err := controllerutil.CreateOrUpdate(ctx, r.HubClient, &endpointSliceExport, func() error {
// Set up an EndpointSliceReference and only when an EndpointSliceExport is first created; this is because
// most fields in EndpointSliceReference should be immutable after creation.
if endpointSliceExport.CreationTimestamp.IsZero() {
endpointSliceReference := fleetnetv1alpha1.FromMetaObjects(r.MemberClusterID,
endpointSlice.TypeMeta, endpointSlice.ObjectMeta, metav1.NewTime(exportedSince))
endpointSliceExport.Spec.EndpointSliceReference = endpointSliceReference
}
// Return an error if an attempt is made to update an EndpointSliceExport that references a different
// EndpointSlice from the one that is being reconciled. This usually happens when one unique name is assigned
// to multiple EndpointSliceExports, either by chance or through direct manipulation.
if !isEndpointSliceExportLinkedWithEndpointSlice(&endpointSliceExport, &endpointSlice) {
return errors.NewAlreadyExists(
schema.GroupResource{Group: fleetnetv1alpha1.GroupVersion.Group, Resource: "EndpointSliceExport"},
fleetUniqueName,
)
}
endpointSliceExport.Spec.AddressType = discoveryv1.AddressTypeIPv4
endpointSliceExport.Spec.Endpoints = extractedEndpoints
endpointSliceExport.Spec.Ports = endpointSlice.Ports
endpointSliceExport.Spec.OwnerServiceReference = fleetnetv1alpha1.OwnerServiceReference{
// The owner Service is guaranteed to reside in the same namespace as the EndpointSlice to export.
Namespace: endpointSlice.Namespace,
Name: endpointSlice.Labels[discoveryv1.LabelServiceName],
NamespacedName: fmt.Sprintf("%s/%s", endpointSlice.Namespace, endpointSlice.Labels[discoveryv1.LabelServiceName]),
}
endpointSliceExport.Spec.EndpointSliceReference.UpdateFromMetaObject(endpointSlice.ObjectMeta, metav1.NewTime(exportedSince))
return nil
})
switch {
case errors.IsAlreadyExists(err):
// Remove the unique name annotation; a new one will be assigned in future reciliation attempts.
klog.V(2).InfoS("The unique name assigned to the endpoint slice has been used; it will be removed", "endpointSlice", endpointSliceRef)
delete(endpointSlice.Annotations, objectmeta.ExportedObjectAnnotationUniqueName)
if err := r.MemberClient.Update(ctx, &endpointSlice); err != nil {
klog.ErrorS(err, "Failed to remove endpointslice unique name annotation", "endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
case err != nil:
klog.ErrorS(err,
"Failed to create/update endpointslice export",
"endpointSlice", endpointSliceRef,
"endpointSliceExport", klog.KObj(&endpointSliceExport),
"op", createOrUpdateOp)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the EndpointSlice controller with a controller manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// Enqueue EndpointSlices for processing when a ServiceExport changes.
eventHandlers := handler.EnqueueRequestsFromMapFunc(func(_ context.Context, o client.Object) []reconcile.Request {
endpointSliceList := &discoveryv1.EndpointSliceList{}
listOpts := client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
discoveryv1.LabelServiceName: o.GetName(),
}),
Namespace: o.GetNamespace(),
}
if err := r.MemberClient.List(ctx, endpointSliceList, &listOpts); err != nil {
klog.ErrorS(err,
"Failed to list endpoint slices in use by a service",
"serviceExport", klog.KRef(o.GetNamespace(), o.GetName()),
)
return []reconcile.Request{}
}
reqs := []reconcile.Request{}
for _, endpointSlice := range endpointSliceList.Items {
reqs = append(reqs, reconcile.Request{
NamespacedName: types.NamespacedName{Namespace: endpointSlice.Namespace, Name: endpointSlice.Name},
})
}
return reqs
})
// EndpointSlice controller watches over EndpointSlice and ServiceExport objects.
return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{}).
Watches(&fleetnetv1alpha1.ServiceExport{}, eventHandlers).
Complete(r)
}
// shouldSkipOrUnexportEndpointSlice returns the op the controller should take on an EndpointSlice, specifically
// whether to skip reconciling an EndpointSlice, and whether to unexport an EndpointSlice.
//
// The controller can only export an EndpointSlice if
// * the EndpointSlice is in use by a Service that has been successfully exported (valid with no conflicts); and
// * the EndpointSlice has not been deleted.
//
// If an EndpointSlice has been exported before, but
// * its owner Service has not been, or is no longer, exported; or
// * the EndpointSlice itself has been deleted
// the EndpointSlice should be unexported.
//
// EndpointSlices that are
// * not exportable; or
// * not owned by a successfully exported Service
// should never be reconciled with this controller.
func (r *Reconciler) shouldSkipOrUnexportEndpointSlice(ctx context.Context,
endpointSlice *discoveryv1.EndpointSlice) (skipOrUnexportEndpointSliceOp, error) {
// Skip the reconciliation if the EndpointSlice is not permanently exportable.
if isEndpointSlicePermanentlyUnexportable(endpointSlice) {
return shouldSkipEndpointSliceOp, nil
}
// If the Service name label is absent, the EndpointSlice is not in use by a Service and thus cannot
// be exported.
svcName, hasSvcNameLabel := endpointSlice.Labels[discoveryv1.LabelServiceName]
// It is guaranteed that if there is no unique name assigned to an EndpointSlice as an annotation, no attempt has
// been made to export an EndpointSlice.
_, hasUniqueNameAnnotation := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
if !hasSvcNameLabel {
if !hasUniqueNameAnnotation {
// The Service is not in use by a Service and does not have a unique name annotation (i.e. it has not been
// exported before); it should be skipped for further processing.
return shouldSkipEndpointSliceOp, nil
}
// The Service is not in use by a Service but has a unique name annotation (i.e. it might have been exported);
// this could happen on an orphaned exported EndpointSlice, which should be unexported.
return shouldUnexportEndpointSliceOp, nil
}
// Retrieve the Service Export.
svcExport := &fleetnetv1alpha1.ServiceExport{}
err := r.MemberClient.Get(ctx, types.NamespacedName{Namespace: endpointSlice.Namespace, Name: svcName}, svcExport)
switch {
case errors.IsNotFound(err) && hasUniqueNameAnnotation:
// The Service using the EndpointSlice is not exported but the EndpointSlice has a unique name annotation
// present (i.e. it might have been exported); the EndpointSlice should be unexported.
return shouldUnexportEndpointSliceOp, nil
case errors.IsNotFound(err) && !hasUniqueNameAnnotation:
// The Service using the EndpointSlice is not exported and the EndpointSlice has no unique name annotation
// present (i.e. it has not been exported before); the EndpointSlice should be skipped for further processing.
return shouldSkipEndpointSliceOp, nil
case err != nil:
// An unexpected error has occurred.
return continueReconcileOp, err
}
// Check if the ServiceExport is valid with no conflicts.
if !isServiceExportValidWithNoConflict(svcExport) {
if hasUniqueNameAnnotation {
// The Service using the EndpointSlice is not valid for export or has conflicts with other exported
// Services, but the EndpointSlice has a unique name annotation present (i.e. it might have been
// exported before); the EndpointSlice should be unexported.
return shouldUnexportEndpointSliceOp, nil
}
// The Service using the EndpointSlice is not valid for export or has conflicts with other exported
// Services, and the EndpointSlice has no unique name annoation present (i.e. it has not been
// exported before); the EndpointSlice should be skipped for further processing.
return shouldSkipEndpointSliceOp, nil
}
if endpointSlice.DeletionTimestamp != nil {
if hasUniqueNameAnnotation {
// The Service using the EndpointSlice is exported with no conflicts, and the EndpointSlice has a unique
// name annotation (i.e. it might have been exported), but it has been deleted; as a result,
// the EndpointSlice should be unexported.
return shouldUnexportEndpointSliceOp, nil
}
// The Service using the EndpointSlice is exported with no conflicts, but the EndpointSlice does not have a
// unique name annotation (i.e. it has not been exported), and it has been deleted; as a result,
// the EndpointSlice should be skipped.
return shouldSkipEndpointSliceOp, nil
}
// The Service using the EndpointSlice is exported with no conflicts, and the EndpointSlice is not marked
// for deletion; the EndpointSlice should be further processed.
return continueReconcileOp, nil
}
// unexportEndpointSlice unexports an EndpointSlice by deleting its corresponding EndpointSliceExport.
func (r *Reconciler) unexportEndpointSlice(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) error {
// Remove the EndpointSliceExport.
if err := r.deleteEndpointSliceExportIfLinked(ctx, endpointSlice); err != nil {
return err
}
// Remove the last seen annotations; this must happen after the EndpointSliceExport has been deleted.
delete(endpointSlice.Annotations, metrics.MetricsAnnotationLastSeenGeneration)
delete(endpointSlice.Annotations, metrics.MetricsAnnotationLastSeenTimestamp)
// Remove the unique name annotation; this must happen after the EndpointSliceExport has been deleted.
delete(endpointSlice.Annotations, objectmeta.ExportedObjectAnnotationUniqueName)
return r.MemberClient.Update(ctx, endpointSlice)
}
// deleteEndpointSliceExportIfLinked deletes an exported EndpointSlice.
func (r *Reconciler) deleteEndpointSliceExportIfLinked(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) error {
fleetUniqueName := endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName]
// Skip the deletion if the unique name assigned as an annotation is not a valid DNS subdomain name; this
// helps guard against user tampering with the annotation.
if !isUniqueNameValid(fleetUniqueName) {
klog.V(2).InfoS("The unique name annotation for exporting the EndpointSlice is not valid; unexport is skipped",
"endpointSlice", klog.KObj(endpointSlice),
"uniqueName", fleetUniqueName)
return nil
}
endpointSliceExport := fleetnetv1alpha1.EndpointSliceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.HubNamespace,
Name: fleetUniqueName,
},
}
endpointSliceExportKey := types.NamespacedName{Namespace: r.HubNamespace, Name: fleetUniqueName}
err := r.HubClient.Get(ctx, endpointSliceExportKey, &endpointSliceExport)
switch {
case errors.IsNotFound(err):
// It is guaranteed that a unique name annotation is always added before an EndpointSlice is exported; and
// in some rare occasions it could happen that an EndpointSlice has a unique name annotation present yet has
// not been exported to the hub cluster. It is an expected behavior and no action is needed on this controller's
// end.
return nil
case err != nil:
// An unexpected error has occurred.
return err
}
if !isEndpointSliceExportLinkedWithEndpointSlice(&endpointSliceExport, endpointSlice) {
// The EndpointSliceExport to which the unique name annotation on the EndpointSlice refers is not actually
// linked with the EndpointSlice. This could happen if direct manipulation forces unique name annotations
// on two different EndpointSlices to point to the same EndpointSliceExport. In this case the
// EndpointSliceExport will not be deleted.
return nil
}
if err := r.HubClient.Delete(ctx, &endpointSliceExport); err != nil && !errors.IsNotFound(err) {
// An unexpected error has occurred.
return err
}
return nil
}
// assignUniqueNameAsAnnotation assigns a new unique name as an annotation.
func (r *Reconciler) assignUniqueNameAsAnnotation(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice) (string, error) {
fleetUniqueName, err := uniquename.FleetScopedUniqueName(uniquename.DNS1123Subdomain,
r.MemberClusterID,
endpointSlice.Namespace,
endpointSlice.Name)
if err != nil {
// Fall back to use a random lower case alphabetic string as the unique name. Normally this branch should
// never run.
klog.ErrorS(err, "Failed to generate a unique name; fall back to random lower case alphabetic strings",
"endpointSlice", klog.KObj(endpointSlice))
fleetUniqueName = uniquename.RandomLowerCaseAlphabeticString(25)
}
// Initialize the annotations field if no annotations are present.
if endpointSlice.Annotations == nil {
endpointSlice.Annotations = map[string]string{}
}
endpointSlice.Annotations[objectmeta.ExportedObjectAnnotationUniqueName] = fleetUniqueName
return fleetUniqueName, r.MemberClient.Update(ctx, endpointSlice)
}
// collectAndVerifyLastSeenGenerationAndTime collects and verifies the last seen generation and timestamp annotations
// on EndpointSlices; it will assign new values if the annotations are not present or not valid.
func (r *Reconciler) collectAndVerifyLastSeenGenerationAndTimestamp(ctx context.Context, endpointslice *discoveryv1.EndpointSlice, startTime time.Time) (time.Time, error) {
// Check if the two annotations are present; assign new values if they are absent.
lastSeenGenerationData, lastSeenGenerationOk := endpointslice.Annotations[metrics.MetricsAnnotationLastSeenGeneration]
lastSeenTimestampData, lastSeenTimestampOk := endpointslice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp]
if !lastSeenGenerationOk || !lastSeenTimestampOk {
return startTime, r.annotateLastSeenGenerationAndTimestamp(ctx, endpointslice, startTime)
}
// Check if the two values are valid and up-to-date; assign new ones if they are not.
lastSeenGeneration, lastSeenGenerationErr := strconv.ParseInt(lastSeenGenerationData, 10, 64)
lastSeenTimestamp, lastSeenTimestampErr := time.Parse(metrics.MetricsLastSeenTimestampFormat, lastSeenTimestampData)
if lastSeenGenerationErr != nil || lastSeenTimestampErr != nil {
return startTime, r.annotateLastSeenGenerationAndTimestamp(ctx, endpointslice, startTime)
}
if lastSeenGeneration != endpointslice.Generation || lastSeenTimestamp.After(startTime) {
return startTime, r.annotateLastSeenGenerationAndTimestamp(ctx, endpointslice, startTime)
}
return lastSeenTimestamp, nil
}
// annotateLastSeenGenerationAndTimestamp annotates an EndpointSlice with last seen generation and timestamp.
func (r *Reconciler) annotateLastSeenGenerationAndTimestamp(ctx context.Context, endpointSlice *discoveryv1.EndpointSlice, startTime time.Time) error {
// Since the controller always annotate last seen generation and timestamp after assigning a unique name,
// the annotations map must have been initialized at this point.
endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenGeneration] = strconv.FormatInt(endpointSlice.Generation, 10)
endpointSlice.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat)
return r.MemberClient.Update(ctx, endpointSlice)
}