pkg/controllers/member/endpointsliceimport/controller.go (267 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
// Package endpointsliceimport features the EndpointSliceImport controller for importing
// EndpointSlices from hub cluster into a member cluster.
package endpointsliceimport
import (
"context"
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/metrics"
"go.goms.io/fleet-networking/pkg/common/objectmeta"
)
const (
// controllerID helps identify that imported EndpointSlices are managed by this controller.
controllerID = "endpointsliceimport-controller.networking.fleet.azure.com"
endpointSliceImportCleanupFinalizer = "networking.fleet.azure.com/endpointsliceimport-cleanup"
mcsServiceImportRefFieldKey = ".spec.serviceImport.name"
endpointSliceImportRetryInterval = time.Second * 2
)
var (
// endpointSliceExportImportDuration is a Prometheus histogram metric bundle that measures the time it takes for
// Fleet networking controllers to export an EndpointSlice from one cluster in the fleet, distribute it, and
// finally import the EndpointSlice into its destination cluster in the fleet. The stopwatch starts when an
// EndpointSlice is ready for export (as an EndpointSliceExport in the hub cluster), and stops when the
// EndpointSlice is successfully imported (from an EndpointSlicdeImport in the hub cluster).
//
// Note that this measurement does not cover the time spent for Fleet networking controllers to pick EndpointSlice
// spec changes, as at this moment there is no way in Kubernetes API for controllers to reliably track it. Nor
// does it cover the time needed for load balancer to actually expose endpoints in the EndpointSlice for traffic.
endpointSliceExportImportDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: metrics.MetricsNamespace,
Subsystem: metrics.MetricsSubsystem,
Name: "endpointslice_export_import_duration_milliseconds",
Help: "The duration of an endpointslice export",
Buckets: metrics.ExportDurationMillisecondsBuckets,
},
[]string{
// The ID of the origin cluster, which exports the Service and the EndpointSlice.
"origin_cluster_id",
// The ID of the destination cluster, which imports the Service and the EndpointSlice.
"destination_cluster_id",
// Whether the data point comes from importing an endpointSlice for the first time.
"is_first_import",
},
)
)
func init() {
// Register endpointSliceExportImportDuration (endpointslice_export_import_duration_milliseconds) metric
// with the controller runtime global metrics registry.
ctrlmetrics.Registry.MustRegister(endpointSliceExportImportDuration)
}
// Reconciler reconciles an EndpointSliceImport.
type Reconciler struct {
MemberClusterID string
MemberClient client.Client
HubClient client.Client
// The namespace reserved for fleet resources in the member cluster.
FleetSystemNamespace string
}
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=endpointsliceimports,verbs=get;list;watch;update;patch
//+kubebuilder:rbac:groups=discovery.k8s.io,resources=endpointslices,verbs=get;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=multiclusterservices,verbs=get;list;watch
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list
// Reconcile imports an EndpointSlice from hub cluster.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
endpointSliceImportRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "endpointSliceImport", endpointSliceImportRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "endpointSliceImport", endpointSliceImportRef, "latency", latency)
}()
// Retrieve the EndpointSliceImport.
endpointSliceImport := &fleetnetv1alpha1.EndpointSliceImport{}
if err := r.HubClient.Get(ctx, req.NamespacedName, endpointSliceImport); err != nil {
// Skip the reconciliation if the EndpointSliceImport does not exist; this should only happen when an
// EndpointSliceImport is deleted before the controller gets a chance to reconcile it, which
// requires no action to take on this controller's end.
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound endpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get endpoint slice import", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Check if the EndpointSliceImport has been deleted and needs cleanup (unimport EndpointSlice).
// An EndpointSliceImport needs cleanup when it has the EndpointSliceImport cleanup finalizer added;
// the absence of this finalizer guarantees that the EndpointSliceImport has never been imported.
endpointSliceRef := klog.KRef(r.FleetSystemNamespace, req.Name)
if endpointSliceImport.DeletionTimestamp != nil {
klog.V(2).InfoS("EndpointSliceImport is deleted; unimport EndpointSlice",
"endpointSliceImport", endpointSliceImportRef,
"endpointSlice", endpointSliceRef)
if err := r.unimportEndpointSlice(ctx, endpointSliceImport); err != nil {
klog.ErrorS(err, "Failed to unimport EndpointSlice",
"endpointSliceImport", endpointSliceImportRef,
"endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Import the EndpointSlice, or update an imported EndpointSlice.
// Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with.
// List all MCSes that attempt to import the Service owning the EndpointSlice.
multiClusterSvcList := &fleetnetv1alpha1.MultiClusterServiceList{}
ownerSvcNS := endpointSliceImport.Spec.OwnerServiceReference.Namespace
ownerSvcName := endpointSliceImport.Spec.OwnerServiceReference.Name
err := r.MemberClient.List(ctx,
multiClusterSvcList,
client.InNamespace(ownerSvcNS),
client.MatchingFields{mcsServiceImportRefFieldKey: ownerSvcName})
switch {
case err != nil:
// An unexpected error occurs.
klog.ErrorS(err, "Failed to list MCS",
"serviceImport", klog.KRef(ownerSvcNS, ownerSvcName),
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
case len(multiClusterSvcList.Items) == 0:
// No matching MCS is found; typically this will never happen as the hub cluster will only distribute
// EndpointSlices to member clusters that have requested them with an MCS. It could be that the controller
// sees an in-between state where a Service is imported and then immediately unimported, and the hub cluster
// does not get to retract distributed EndpointSlices in time. In this case the controller will skip
// importing the EndpointSlice.
klog.V(2).InfoS("No matching MCS is found; EndpointSlice will not be imported",
"serviceImport", klog.KRef(ownerSvcNS, ownerSvcName),
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, nil
}
// Scan all matching MCSes: inspect each MCS for the derived Service label; if one is present, it signals
// that the MCS has successfully imported the Service owning the imported EndpointSlice, which the controller
// will use for EndpointSlice association.
// At this moment, the scan uses a first-match logic, as it is guaranteed that if multiple MCSes, either from
// one member cluster or from multiple clusters from the fleet, attempt to import the same Service, it is
// guaranteed that only one will succeed.
derivedSvcName := scanForDerivedServiceName(multiClusterSvcList)
// Verify if the found derived Service label points to a Service that the controller can associate the
// EndpointSlice with. In most cases this check will always pass as the hub cluster will only distribute
// EndpointSlices to member clusters that have requested them with an MCS; still, there are some corner
// cases that the controller must guard against, such as:
// * user has tampered with the derived Service label; or
// * the controller sees an in-between state where a Service is imported and then immediately unimported,
// and the hub cluster does not get to retract distributed EndpointSlices in time; or
// * a connectivity issue has kept the member cluster out of sync with the hub cluster, with the member cluster
// not knowing that a Service has been successfully claimed by itself; or
// * the controller for processing MCSes lags, and has not created the derived Service in time.
isValid, err := r.isDerivedServiceValid(ctx, derivedSvcName)
switch {
case err != nil:
klog.ErrorS(err, "Failed to check if derived Service is valid",
"derivedServiceName", derivedSvcName,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
case !isValid:
// Retry importing the EndpointSlice at a later time if no valid derived Service can be found.
klog.V(2).InfoS("No valid derived Service; will retry importing EndpointSlice later",
"derivedServiceName", derivedSvcName,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{RequeueAfter: endpointSliceImportRetryInterval}, nil
}
// Special note:
// There exists a corner case where an MCS that imports a specific Service have multiple derived Services created;
// this is usually the result of direct label manipulation on the user's end. Ideally, this controller should watch
// for changes on MCS resources and (re)associate imported EndpointSlices to the latest dervied Service in use;
// however, unfortunately, with the current implementation of controller-runtime package, it is not possible
// for the controller to watch for resources on two different directions (member cluster and hub cluster),
// and as a result, an imported EndpointSlice could be bound to a derived Service that is no longer in use.
// Periodic resyncs can help address this issue, but it may take a quite long while before the situation is
// corrected, should this corner case happens.
// Add the cleanup finalizer (if one has not been added earlier); this must happen before
// the EndpointSlice is imported.
klog.V(2).InfoS("Add cleanup finalizer to EndpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
if err := r.addEndpointSliceImportCleanupFinalizer(ctx, endpointSliceImport); err != nil {
klog.ErrorS(err, "Failed to add cleanup finalizer to EndpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Associate the EndpointSlice with the Service.
klog.V(2).InfoS("Import the EndpointSlice", "endpointSlice", endpointSliceRef)
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.FleetSystemNamespace,
Name: endpointSliceImport.Name,
},
}
if op, err := controllerutil.CreateOrUpdate(ctx, r.MemberClient, endpointSlice, func() error {
formatEndpointSliceFromImport(endpointSlice, derivedSvcName, endpointSliceImport)
return nil
}); err != nil {
klog.ErrorS(err, "Failed to create/update EndpointSlice",
"endpointSlice", endpointSliceRef,
"op", op,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Observe a data point for the EndpointSliceExportImportDuration metric.
if err := r.observeMetrics(ctx, endpointSliceImport, time.Now()); err != nil {
klog.Warning("Failed to observe metrics", "error", err, "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager builds a controller with Reconciler and sets it up with a controller manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, memberCtrlMgr, hubCtrlMgr ctrl.Manager) error {
// Set up an index for efficient MCS lookup **on the controller manager for member cluster controllers**.
indexerFunc := func(o client.Object) []string {
multiClusterSvc, ok := o.(*fleetnetv1alpha1.MultiClusterService)
if !ok {
return []string{}
}
return []string{multiClusterSvc.Spec.ServiceImport.Name}
}
if err := memberCtrlMgr.GetFieldIndexer().IndexField(ctx,
&fleetnetv1alpha1.MultiClusterService{},
mcsServiceImportRefFieldKey,
indexerFunc,
); err != nil {
return err
}
// The controller itself is managed by the controller manager for hub cluster controllers.
return ctrl.NewControllerManagedBy(hubCtrlMgr).
// The EndpointSliceImport controller watches over EndpointSliceImport objects.
For(&fleetnetv1alpha1.EndpointSliceImport{}).
Complete(r)
}
// unimportEndpointSlice unimports an EndpointSlice.
func (r *Reconciler) unimportEndpointSlice(ctx context.Context, endpointSliceImport *fleetnetv1alpha1.EndpointSliceImport) error {
// Skip the unimporting if the cleanup finalizer is not present on the EndpointSliceImport; the absence of this
// finalizer guarantees that the EndpointSlice has never been imported.
if !controllerutil.ContainsFinalizer(endpointSliceImport, endpointSliceImportCleanupFinalizer) {
return nil
}
// Unimport the EndpointSlice.
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.FleetSystemNamespace,
Name: endpointSliceImport.Name,
},
}
if err := r.MemberClient.Delete(ctx, endpointSlice); err != nil && !errors.IsNotFound(err) {
// It is guaranteed that a finalizer is always added before an EndpointSlice is imported; in some rare
// occasions it could happen that an EndpointSliceImport has a finalizer added yet the corresponding
// EndpointSlice has not been imported in the member cluster. It is an expected behavior and no action
// is needed on this controller's end.
return err
}
// Remove the EndpointSliceImport cleanup finalizer.
controllerutil.RemoveFinalizer(endpointSliceImport, endpointSliceImportCleanupFinalizer)
return r.HubClient.Update(ctx, endpointSliceImport)
}
// addEndpointSliceImportCleanupFinalizer adds the cleanup finalizer to an EndpointSliceImport.
func (r *Reconciler) addEndpointSliceImportCleanupFinalizer(ctx context.Context, endpointSliceImport *fleetnetv1alpha1.EndpointSliceImport) error {
if !controllerutil.ContainsFinalizer(endpointSliceImport, endpointSliceImportCleanupFinalizer) {
controllerutil.AddFinalizer(endpointSliceImport, endpointSliceImportCleanupFinalizer)
return r.HubClient.Update(ctx, endpointSliceImport)
}
return nil
}
// isDerivedServiceValid returns if a derived Service is valid for EndpointSlice association.
func (r *Reconciler) isDerivedServiceValid(ctx context.Context, derivedSvcName string) (bool, error) {
// Check if the given name is a valid Service name; this helps guard against user tampering the label.
if errs := validation.IsDNS1035Label(derivedSvcName); len(errs) != 0 {
return false, nil
}
// Check if the derived Service has been created and has not been marked for deletion.
// The derived Service label is added before the actual Service is created; in some (highly unlikely) scenarios it
// could happen that the controller sees a derived Service label yet cannot find the corresponding Service.
derivedSvc := &corev1.Service{}
derivedSvcKey := types.NamespacedName{Namespace: r.FleetSystemNamespace, Name: derivedSvcName}
if err := r.MemberClient.Get(ctx, derivedSvcKey, derivedSvc); err != nil {
return false, client.IgnoreNotFound(err)
}
return derivedSvc.DeletionTimestamp == nil, nil
}
// scanForDerivedServiceName scans a list of MCSes and returns the first found derived Service label in the list.
func scanForDerivedServiceName(multiClusterSvcList *fleetnetv1alpha1.MultiClusterServiceList) string {
var derivedSvcName string
for _, multiClusterSvc := range multiClusterSvcList.Items {
if multiClusterSvc.DeletionTimestamp != nil {
continue
}
svcName, ok := multiClusterSvc.Labels[objectmeta.MultiClusterServiceLabelDerivedService]
if ok {
derivedSvcName = svcName
break
}
}
return derivedSvcName
}
// formatEndpointSliceFromImport formats an EndpointSlice using an EndpointSliceImport.
func formatEndpointSliceFromImport(endpointSlice *discoveryv1.EndpointSlice, derivedSvcName string, endpointSliceImport *fleetnetv1alpha1.EndpointSliceImport) {
endpointSlice.AddressType = endpointSliceImport.Spec.AddressType
endpointSlice.Labels = map[string]string{
discoveryv1.LabelServiceName: derivedSvcName,
discoveryv1.LabelManagedBy: controllerID,
}
endpointSlice.Ports = endpointSliceImport.Spec.Ports
endpoints := []discoveryv1.Endpoint{}
for _, importedEndpoint := range endpointSliceImport.Spec.Endpoints {
endpoints = append(endpoints, discoveryv1.Endpoint{
Addresses: importedEndpoint.Addresses,
})
}
endpointSlice.Endpoints = endpoints
}
// Observe data points for metrics.
func (r *Reconciler) observeMetrics(ctx context.Context, endpointSliceImport *fleetnetv1alpha1.EndpointSliceImport, startTime time.Time) error {
// Check if a metric data point has been observed for the current generation of the object; this helps guard
// against repeated observation of metric data points for the same generation of an object due to no-op
// reconciliations (e.g. resyncs, untracked changes).
lastObservedGeneration, ok := endpointSliceImport.Annotations[metrics.MetricsAnnotationLastObservedGeneration]
currentGenerationStr := fmt.Sprintf("%d", endpointSliceImport.Spec.EndpointSliceReference.Generation)
// isFirstImport flag signals if the endpointSlice has been imported before. This flag is for the purpose of
// filtering out any outlier caused by late service imports: service export/import is two-phase op, in which either
// phase can be performed individually in no specific order; it is possible for a user to export a service first
// and then import it as a MCS at a much later time, and consequently a significant delay, through no fault
// of Fleet networking controllers, will be observed when importing endpointSlices from the service for the
// first time.
// Note that technically speaking there is no easy way for controllers to ascertain whether (and exactly how much)
// the factor of late imports plays a part in the export/import latency.
isFirstImport := !ok
if ok && lastObservedGeneration == currentGenerationStr {
// A data point has been observed for this generation; skip the observation.
return nil
}
// Observe a new data point.
// Annotate the object to track the last observed generation; this must happen before the actual observation.
if endpointSliceImport.Annotations == nil {
// Initialize the annotation map if it is empty.
endpointSliceImport.Annotations = map[string]string{}
}
endpointSliceImport.Annotations[metrics.MetricsAnnotationLastObservedGeneration] = currentGenerationStr
if err := r.HubClient.Update(ctx, endpointSliceImport); err != nil {
return err
}
// Skip the observation if the exportedSince field is empty in the object reference.
// Note that in most cases this branch should never run as the Fleet networking controllers will always assign a
// timestamp for each exported object.
if endpointSliceImport.Spec.EndpointSliceReference.ExportedSince.IsZero() {
klog.V(4).InfoS("exportedSince timestamp is absent; endpointSlice export/import duration data point is not collected",
"endpointSliceImport", klog.KObj(endpointSliceImport))
return nil
}
timeSpent := startTime.Sub(endpointSliceImport.Spec.EndpointSliceReference.ExportedSince.Time).Milliseconds()
// Under some rare circumstances (such as time sync not being configured properly across clusters), it could
// happen that the export timestamp of an EndpointSlice appears later than its import timestamp. Unfortunately,
// clock discrepancies are out of Fleet networking's control and there is not an easy way to determine how
// much clocks drift from each other and if (when) it will recover. To avoid negative outliers affecting
// data analysis, this controller assigns a constant of exactly 1 second when the calculated duration does
// not make sense.
if timeSpent <= 0 {
timeSpent = time.Second.Milliseconds() * 1
klog.V(4).Info("A negative endpointSlice export/import duration data point has been observed; time sync might be out of order",
"serviceNamespacedName", endpointSliceImport.Spec.OwnerServiceReference.NamespacedName,
"endpointSliceNamespacedName", endpointSliceImport.Spec.EndpointSliceReference.NamespacedName,
"originClusterID", endpointSliceImport.Spec.EndpointSliceReference.ClusterID,
"destinationClusterID", r.MemberClusterID,
"isFirstImport", isFirstImport)
}
// Similarly, to avoid large outliers skewing the stats (e.g. averages), this controller caps the data point
// to a constant value.
if timeSpent > int64(metrics.ExportDurationRightBound) {
timeSpent = int64(metrics.ExportDurationRightBound)
}
endpointSliceExportImportDuration.
WithLabelValues(endpointSliceImport.Spec.EndpointSliceReference.ClusterID, r.MemberClusterID, fmt.Sprintf("%t", isFirstImport)).
Observe(float64(timeSpent))
// TO-DO (chenyu1): Remove the metric logs when histogram metrics are supported in the backend.
klog.V(2).InfoS("endpointSliceExportImportDurationMilliseconds",
"value", timeSpent,
"originClusterID", endpointSliceImport.Spec.EndpointSliceReference.ClusterID,
"destinationClusterID", r.MemberClusterID,
"isFirstImport", isFirstImport)
return nil
}