func()

in pkg/controllers/hub/endpointsliceexport/controller.go [70:246]


func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	endpointSliceExportRef := klog.KRef(req.Namespace, req.Name)
	startTime := time.Now()
	klog.V(2).InfoS("Reconciliation starts", "endpointSliceExport", endpointSliceExportRef)
	defer func() {
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("Reconciliation ends", "endpointSliceExport", endpointSliceExportRef, "latency", latency)
	}()

	// Retrieve the EndpointSliceExport object.
	endpointSliceExport := &fleetnetv1alpha1.EndpointSliceExport{}
	if err := r.HubClient.Get(ctx, req.NamespacedName, endpointSliceExport); err != nil {
		// Skip the reconciliation if the EndpointSliceExport does not exist; this should only happen when the
		// EndpointSliceExport does not have a finalizer set and is deleted right before the controller gets a
		// chance to reconcile it. The absence of the finalizer guarantees that the EndpointSlice has never been
		// distributed across the fleet, thus no action is needed on this controller's side.
		if errors.IsNotFound(err) {
			klog.V(4).InfoS("Ignoring NotFound endpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
			return ctrl.Result{}, nil
		}
		klog.ErrorS(err, "Failed to get endpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
		return ctrl.Result{}, err
	}

	// Check if the EndpointSliceExport has been marked for deletion; withdraw EndpointSliceImports across
	// the fleet if the EndpointSlice has been distributed.
	if endpointSliceExport.DeletionTimestamp != nil {
		if controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) {
			// The presence of the EndpointSliceExport cleanup finalizer guarantees that an attempt has been made
			// to distribute the EndpointSlice.
			klog.V(2).InfoS("EndpointSliceExport deleted; withdraw distributed EndpointSlices", "endpointSliceExport", endpointSliceExportRef)
			if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil {
				return ctrl.Result{}, err
			}
			return ctrl.Result{}, nil
		}
		return ctrl.Result{}, nil
	}

	// Inquire the corresponding ServiceImport to find out which member clusters the EndpointSlice should be
	// distributed to.
	ownerSvcNS := endpointSliceExport.Spec.OwnerServiceReference.Namespace
	ownerSvc := endpointSliceExport.Spec.OwnerServiceReference.Name
	svcImportKey := types.NamespacedName{Namespace: ownerSvcNS, Name: ownerSvc}
	svcImport := &fleetnetv1alpha1.ServiceImport{}
	svcImportRef := klog.KRef(ownerSvcNS, ownerSvc)
	klog.V(2).InfoS("Inquire ServceImport to find out which member clusters have requested the EndpointSlice",
		"serviceImport", svcImportRef,
		"endpointSliceExport", endpointSliceExportRef)
	err := r.HubClient.Get(ctx, svcImportKey, svcImport)
	switch {
	case err != nil && errors.IsNotFound(err):
		// The corresponding ServiceImport does not exist; normally this will never happen as an EndpointSlice can
		// only be exported after its owner Service has been successfully exported. It could be that the controller
		// observes some in-between state, such as a Service is deleted right after being exported successfully,
		// and the system does not get to withdraw exported EndpointSlices from the Service yet. The controller
		// will requeue the EndpointSliceExport and wait until the state stablizes.
		klog.V(2).InfoS("ServiceImport does not exist", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef)
		return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil
	case err != nil:
		// An unexpected error occurs.
		klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "endpointSliceExport", endpointSliceExportRef)
		return ctrl.Result{}, err
	case len(svcImport.Status.Clusters) == 0:
		// The corresponding ServiceImport exists but it is still being processed. This is also a case that
		// should not happen in normal situations. The controller could be, once again, observing some in-between
		// state. The EndpointSliceExport will be requeued and re-processed when the state stablizes.
		klog.V(2).InfoS("ServiceImport is being processed (no accepted exports yet)",
			"serviceImport", svcImportRef,
			"endpointSliceExport", endpointSliceExportRef)
		return ctrl.Result{RequeueAfter: endpointSliceExportRetryInterval}, nil
	}

	data, ok := svcImport.ObjectMeta.Annotations[objectmeta.ServiceImportAnnotationServiceInUseBy]
	if !ok {
		// No cluster has requested to import the EndpointSlice's owner service.
		// If the exported EndpointSlice has been distributed across the fleet before; withdraw the
		// EndpointSliceImports.
		klog.V(2).InfoS("No cluster has requested to import the Service; withdraw distributed EndpointSlices",
			"serviceImport", svcImportRef,
			"endpointSliceExport", endpointSliceExportRef)
		if err := r.withdrawAllEndpointSliceImports(ctx, endpointSliceExport); err != nil {
			return ctrl.Result{}, err
		}
		// There is no need to remove the local EndpointSlice copy in this situation (the copy might be in
		// use by load balancing solutions on the hub cluster).
		return ctrl.Result{}, nil
	}

	svcInUseBy := &fleetnetv1alpha1.ServiceInUseBy{}
	if err := json.Unmarshal([]byte(data), svcInUseBy); err != nil {
		klog.ErrorS(err, "Failed to unmarshal data for in-use Services from ServiceImport annotations",
			"serviceImport", svcImportRef,
			"endpointSliceExport", endpointSliceExportRef,
			"data", data)
		// This error cannot be recovered by retrying; a reconciliation will be triggered when the ServiceInUseBy
		// data is overwritten.
		return ctrl.Result{}, nil
	}

	// Distribute the EndpointSlices.

	// Add cleanup finalizer to the EndpointSliceExport; this must happen before EndpointSlice is distributed.
	if !controllerutil.ContainsFinalizer(endpointSliceExport, endpointSliceExportCleanupFinalizer) {
		if err := r.addEndpointSliceExportCleanupFinalizer(ctx, endpointSliceExport); err != nil {
			klog.ErrorS(err, "Failed to add cleanup finalizer to EndpointSliceExport", "endpointSliceExport", endpointSliceExportRef)
			return ctrl.Result{}, err
		}
	}

	// Scan for EndpointSlices to withdraw and EndpointSlices to create or update.
	klog.V(2).InfoS("Scan for EndpointSliceImports to withdraw and to create/update",
		"serviceInUseBy", svcInUseBy,
		"endpointSliceExport", endpointSliceExport)
	endpointSliceImportsToWithdraw, endpointSlicesImportsToCreateOrUpdate, err := r.scanForEndpointSliceImports(ctx, endpointSliceExport, svcInUseBy)
	if err != nil {
		return ctrl.Result{}, err
	}
	klog.V(4).InfoS("EndpointSliceImports to withdraw", "count", len(endpointSliceImportsToWithdraw))
	klog.V(4).InfoS("EndpointSliceImports to create or update", "count", len(endpointSlicesImportsToCreateOrUpdate))

	// Delete distributed EndpointSlices that are no longer needed.
	//
	// Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently
	// len(endpointSliceImportsToDelete) is at most 1. However, this behavior is subject to change as fleet
	// networking evolves, and for future compatibility reasons, the function assumes that a Service might have been
	// imported to multiple clusters.
	for idx := range endpointSliceImportsToWithdraw {
		endpointSliceImport := endpointSliceImportsToWithdraw[idx]
		// Skip if the EndpointSliceImport has been marked for deletion.
		if endpointSliceImport.DeletionTimestamp != nil {
			continue
		}
		klog.V(4).InfoS("Withdraw endpointSlice",
			"endpointSliceImport", klog.KObj(endpointSliceImport),
			"endpointSliceExport", endpointSliceExportRef)
		if err := apiretry.Do(func() error {
			return r.HubClient.Delete(ctx, endpointSliceImport)
		}); err != nil && !errors.IsNotFound(err) {
			klog.ErrorS(err, "Failed to withdraw EndpointSlice",
				"endpointSliceImport", klog.KObj(endpointSliceImport),
				"endpointSliceExport", endpointSliceExportRef)
			return ctrl.Result{}, err
		}
	}

	// Create or update distributed EndpointSlices.
	//
	// Note: At this moment, it is guaranteed that any Service can only be imported once across the fleet, consequently
	// len(endpointSliceImportsToCreateOrUpdate) is at most 1. However, this behavior is subject to change as fleet
	// networking evolves, and for future compatibility reasons, the function assumes that a Service might have been
	// imported to multiple clusters.
	for idx := range endpointSlicesImportsToCreateOrUpdate {
		endpointSliceImport := endpointSlicesImportsToCreateOrUpdate[idx]
		klog.V(4).InfoS("Create/update endpointSliceImport",
			"endpointSliceImport", klog.KObj(endpointSliceImport),
			"endpointSliceExport", endpointSliceExportRef)

		var op controllerutil.OperationResult
		if err := apiretry.Do(func() error {
			var createOrUpdateErr error
			op, createOrUpdateErr = controllerutil.CreateOrUpdate(ctx, r.HubClient, endpointSliceImport, func() error {
				endpointSliceImport.Spec = *endpointSliceExport.Spec.DeepCopy()
				return nil
			})
			return createOrUpdateErr
		}); err != nil {
			klog.ErrorS(err, "Failed to create or update EndpointSliceImport",
				"endpointSliceImport", klog.KObj(endpointSliceImport),
				"endpointSliceExport", endpointSliceExportRef,
				"op", op)
			return ctrl.Result{}, err
		}
	}

	return ctrl.Result{}, nil
}