in pkg/controllers/member/endpointsliceimport/controller.go [93:241]
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
endpointSliceImportRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "endpointSliceImport", endpointSliceImportRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "endpointSliceImport", endpointSliceImportRef, "latency", latency)
}()
// Retrieve the EndpointSliceImport.
endpointSliceImport := &fleetnetv1alpha1.EndpointSliceImport{}
if err := r.HubClient.Get(ctx, req.NamespacedName, endpointSliceImport); err != nil {
// Skip the reconciliation if the EndpointSliceImport does not exist; this should only happen when an
// EndpointSliceImport is deleted before the controller gets a chance to reconcile it, which
// requires no action to take on this controller's end.
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound endpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get endpoint slice import", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Check if the EndpointSliceImport has been deleted and needs cleanup (unimport EndpointSlice).
// An EndpointSliceImport needs cleanup when it has the EndpointSliceImport cleanup finalizer added;
// the absence of this finalizer guarantees that the EndpointSliceImport has never been imported.
endpointSliceRef := klog.KRef(r.FleetSystemNamespace, req.Name)
if endpointSliceImport.DeletionTimestamp != nil {
klog.V(2).InfoS("EndpointSliceImport is deleted; unimport EndpointSlice",
"endpointSliceImport", endpointSliceImportRef,
"endpointSlice", endpointSliceRef)
if err := r.unimportEndpointSlice(ctx, endpointSliceImport); err != nil {
klog.ErrorS(err, "Failed to unimport EndpointSlice",
"endpointSliceImport", endpointSliceImportRef,
"endpointSlice", endpointSliceRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Import the EndpointSlice, or update an imported EndpointSlice.
// Inquire the corresponding MCS to find out which Service the imported EndpointSlice should associate with.
// List all MCSes that attempt to import the Service owning the EndpointSlice.
multiClusterSvcList := &fleetnetv1alpha1.MultiClusterServiceList{}
ownerSvcNS := endpointSliceImport.Spec.OwnerServiceReference.Namespace
ownerSvcName := endpointSliceImport.Spec.OwnerServiceReference.Name
err := r.MemberClient.List(ctx,
multiClusterSvcList,
client.InNamespace(ownerSvcNS),
client.MatchingFields{mcsServiceImportRefFieldKey: ownerSvcName})
switch {
case err != nil:
// An unexpected error occurs.
klog.ErrorS(err, "Failed to list MCS",
"serviceImport", klog.KRef(ownerSvcNS, ownerSvcName),
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
case len(multiClusterSvcList.Items) == 0:
// No matching MCS is found; typically this will never happen as the hub cluster will only distribute
// EndpointSlices to member clusters that have requested them with an MCS. It could be that the controller
// sees an in-between state where a Service is imported and then immediately unimported, and the hub cluster
// does not get to retract distributed EndpointSlices in time. In this case the controller will skip
// importing the EndpointSlice.
klog.V(2).InfoS("No matching MCS is found; EndpointSlice will not be imported",
"serviceImport", klog.KRef(ownerSvcNS, ownerSvcName),
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, nil
}
// Scan all matching MCSes: inspect each MCS for the derived Service label; if one is present, it signals
// that the MCS has successfully imported the Service owning the imported EndpointSlice, which the controller
// will use for EndpointSlice association.
// At this moment, the scan uses a first-match logic, as it is guaranteed that if multiple MCSes, either from
// one member cluster or from multiple clusters from the fleet, attempt to import the same Service, it is
// guaranteed that only one will succeed.
derivedSvcName := scanForDerivedServiceName(multiClusterSvcList)
// Verify if the found derived Service label points to a Service that the controller can associate the
// EndpointSlice with. In most cases this check will always pass as the hub cluster will only distribute
// EndpointSlices to member clusters that have requested them with an MCS; still, there are some corner
// cases that the controller must guard against, such as:
// * user has tampered with the derived Service label; or
// * the controller sees an in-between state where a Service is imported and then immediately unimported,
// and the hub cluster does not get to retract distributed EndpointSlices in time; or
// * a connectivity issue has kept the member cluster out of sync with the hub cluster, with the member cluster
// not knowing that a Service has been successfully claimed by itself; or
// * the controller for processing MCSes lags, and has not created the derived Service in time.
isValid, err := r.isDerivedServiceValid(ctx, derivedSvcName)
switch {
case err != nil:
klog.ErrorS(err, "Failed to check if derived Service is valid",
"derivedServiceName", derivedSvcName,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
case !isValid:
// Retry importing the EndpointSlice at a later time if no valid derived Service can be found.
klog.V(2).InfoS("No valid derived Service; will retry importing EndpointSlice later",
"derivedServiceName", derivedSvcName,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{RequeueAfter: endpointSliceImportRetryInterval}, nil
}
// Special note:
// There exists a corner case where an MCS that imports a specific Service have multiple derived Services created;
// this is usually the result of direct label manipulation on the user's end. Ideally, this controller should watch
// for changes on MCS resources and (re)associate imported EndpointSlices to the latest dervied Service in use;
// however, unfortunately, with the current implementation of controller-runtime package, it is not possible
// for the controller to watch for resources on two different directions (member cluster and hub cluster),
// and as a result, an imported EndpointSlice could be bound to a derived Service that is no longer in use.
// Periodic resyncs can help address this issue, but it may take a quite long while before the situation is
// corrected, should this corner case happens.
// Add the cleanup finalizer (if one has not been added earlier); this must happen before
// the EndpointSlice is imported.
klog.V(2).InfoS("Add cleanup finalizer to EndpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
if err := r.addEndpointSliceImportCleanupFinalizer(ctx, endpointSliceImport); err != nil {
klog.ErrorS(err, "Failed to add cleanup finalizer to EndpointSliceImport", "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Associate the EndpointSlice with the Service.
klog.V(2).InfoS("Import the EndpointSlice", "endpointSlice", endpointSliceRef)
endpointSlice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.FleetSystemNamespace,
Name: endpointSliceImport.Name,
},
}
if op, err := controllerutil.CreateOrUpdate(ctx, r.MemberClient, endpointSlice, func() error {
formatEndpointSliceFromImport(endpointSlice, derivedSvcName, endpointSliceImport)
return nil
}); err != nil {
klog.ErrorS(err, "Failed to create/update EndpointSlice",
"endpointSlice", endpointSliceRef,
"op", op,
"endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
// Observe a data point for the EndpointSliceExportImportDuration metric.
if err := r.observeMetrics(ctx, endpointSliceImport, time.Now()); err != nil {
klog.Warning("Failed to observe metrics", "error", err, "endpointSliceImport", endpointSliceImportRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}