in pkg/controllers/hub/endpointsliceexport/controller.go [70:246]
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
endpointSliceExportRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "endpointSliceExport", endpointSliceExportRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "endpointSliceExport", endpointSliceExportRef, "latency", latency)
}()
// Retrieve the EndpointSliceExport object.
endpointSliceExport := &fleetnetv1alpha1.EndpointSliceExport{}
if err := r.HubClient.Get(ctx, req.NamespacedName, endpointSliceExport); err != nil {
// Skip the reconciliation if the EndpointSliceExport does not exist; this should only happen when the
// EndpointSliceExport does not have a finalizer set and is deleted right before the controller gets a
// chance to reconcile it. The absence of the finalizer guarantees that the EndpointSlice has never been
// distributed across the fleet, thus no action is needed on this controller's side.
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound endpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get endpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{}, err
}
// Check if the EndpointSliceExport has been marked for deletion; withdraw EndpointSliceImports across
// the fleet if the EndpointSlice has been distributed.
if endpointSliceExport.DeletionTimestamp != nil {
if controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) {
// The presence of the EndpointSliceExport cleanup finalizer guarantees that an attempt has been made
// to distribute the EndpointSlice.
klog.V(2).InfoS("EndpointSliceExport deleted; withdraw distributed EndpointSlices", "endpointSliceExport", endpointSliceExportRef)
if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
// Inquire the corresponding ServiceImport to find out which member clusters the EndpointSlice should be
// distributed to.
ownerSvcNS := endpointSliceExport.Spec.OwnerServiceReference.Namespace
ownerSvc := endpointSliceExport.Spec.OwnerServiceReference.Name
svcImportKey := types.NamespacedName{Namespace: ownerSvcNS, Name: ownerSvc}
svcImport := &fleetnetv1alpha1.ServiceImport{}
svcImportRef := klog.KRef(ownerSvcNS, ownerSvc)
klog.V(2).InfoS("Inquire ServceImport to find out which member clusters have requested the EndpointSlice",
"serviceImport", svcImportRef,
"endpointSliceExport", endpointSliceExportRef)
err := r.HubClient.Get(ctx, svcImportKey, svcImport)
switch {
case err != nil && errors.IsNotFound(err):
// The corresponding ServiceImport does not exist; normally this will never happen as an EndpointSlice can
// only be exported after its owner Service has been successfully exported. It could be that the controller
// observes some in-between state, such as a Service is deleted right after being exported successfully,
// and the system does not get to withdraw exported EndpointSlices from the Service yet. The controller
// will requeue the EndpointSliceExport and wait until the state stablizes.
klog.V(2).InfoS("ServiceImport does not exist", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil
case err != nil:
// An unexpected error occurs.
klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{}, err
case len(svcImport.Status.Clusters) == 0:
// The corresponding ServiceImport exists but it is still being processed. This is also a case that
// should not happen in normal situations. The controller could be, once again, observing some in-between
// state. The EndpointSliceExport will be requeued and re-processed when the state stablizes.
klog.V(2).InfoS("ServiceImport is being processed (no accepted exports yet)",
"serviceImport", svcImportRef,
"endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil
}
data, ok := svcImport.ObjectMeta.Annotations[objectmeta.ServiceImportAnnotationServiceInUseBy]
if !ok {
// No cluster has requested to import the EndpointSlice's owner service.
// If the exported EndpointSlice has been distributed across the fleet before; withdraw the
// EndpointSliceImports.
klog.V(2).InfoS("No cluster has requested to import the Service; withdraw distributed EndpointSlices",
"serviceImport", svcImportRef,
"endpointSliceExport", endpointSliceExportRef)
if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil {
return ctrl.Result{}, err
}
// There is no need to remove the local EndpointSlice copy in this situation (the copy might be in
// use by load balancing solutions on the hub cluster).
return ctrl.Result{}, nil
}
svcInUseBy := &fleetnetv1alpha1.ServiceInUseBy{}
if err := json.Unmarshal([]byte(data), svcInUseBy); err != nil {
klog.ErrorS(err, "Failed to unmarshal data for in-use Services from ServiceImport annotations",
"serviceImport", svcImportRef,
"endpointSliceExport", endpointSliceExportRef,
"data", data)
// This error cannot be recovered by retrying; a reconciliation will be triggered when the ServiceInUseBy
// data is overwritten.
return ctrl.Result{}, nil
}
// Distribute the EndpointSlices.
// Add cleanup finalizer to the EndpointSliceExport; this must happen before EndpointSlice is distributed.
if !controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) {
if err := r.addEndpointSliceExportCleanupFinalizer(ctx, endpointSliceExport); err != nil {
klog.ErrorS(err, "Failed to add cleanup finalizer to EndpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{}, err
}
}
// Scan for EndpointSlices to withdraw and EndpointSlices to create or update.
klog.V(2).InfoS("Scan for EndpointSliceImports to withdraw and to create/update",
"serviceInUseBy", svcInUseBy,
"endpointSliceExport", endpointSliceExport)
endpointSliceImportsToWithdraw, endpointSlicesImportsToCreateOrUpdate, err := r.scanForEndpointSliceImports(ctx, endpointSliceExport, svcInUseBy)
if err != nil {
return ctrl.Result{}, err
}
klog.V(4).InfoS("EndpointSliceImports to withdraw", "count", len(endpointSliceImportsToWithdraw))
klog.V(4).InfoS("EndpointSliceImports to create or update", "count", len(endpointSlicesImportsToCreateOrUpdate))
// Delete distributed EndpointSlices that are no longer needed.
//
// Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently
// len(endpointSliceImportsToDelete) is at most 1. However, this behavior is subject to change as fleet
// networking evolves, and for future compatibility reasons, the function assumes that a Service might have been
// imported to multiple clusters.
for idx := range endpointSliceImportsToWithdraw {
endpointSliceImport := endpointSliceImportsToWithdraw[idx]
// Skip if the EndpointSliceImport has been marked for deletion.
if endpointSliceImport.DeletionTimestamp != nil {
continue
}
klog.V(4).InfoS("Withdraw endpointSlice",
"endpointSliceImport", klog.KObj(endpointSliceImport),
"endpointSliceExport", endpointSliceExportRef)
if err := apiretry.Do(func() error {
return r.HubClient.Delete(ctx, endpointSliceImport)
}); err != nil && !errors.IsNotFound(err) {
klog.ErrorS(err, "Failed to withdraw EndpointSlice",
"endpointSliceImport", klog.KObj(endpointSliceImport),
"endpointSliceExport", endpointSliceExportRef)
return ctrl.Result{}, err
}
}
// Create or update distributed EndpointSlices.
//
// Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently
// len(endpointSliceImportsToCreateOrUpdate) is at most 1. However, this behavior is subject to change as fleet
// networking evolves, and for future compatibility reasons, the function assumes that a Service might have been
// imported to multiple clusters.
for idx := range endpointSlicesImportsToCreateOrUpdate {
endpointSliceImport := endpointSlicesImportsToCreateOrUpdate[idx]
klog.V(4).InfoS("Create/update endpointSliceImport",
"endpointSliceImport", klog.KObj(endpointSliceImport),
"endpointSliceExport", endpointSliceExportRef)
var op controllerutil.OperationResult
if err := apiretry.Do(func() error {
var createOrUpdateErr error
op, createOrUpdateErr = controllerutil.CreateOrUpdate(ctx, r.HubClient, endpointSliceImport, func() error {
endpointSliceImport.Spec = *endpointSliceExport.Spec.DeepCopy()
return nil
})
return createOrUpdateErr
}); err != nil {
klog.ErrorS(err, "Failed to create or update EndpointSliceImport",
"endpointSliceImport", klog.KObj(endpointSliceImport),
"endpointSliceExport", endpointSliceExportRef,
"op", op)
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}