func()

in pkg/controllers/hub/internalserviceimport/controller.go [49:170]


func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	internalSvcImportRef := klog.KRef(req.Namespace, req.Name)
	startTime := time.Now()
	klog.V(2).InfoS("Reconciliation starts", "internalServiceImport", internalSvcImportRef)
	defer func() {
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("Reconciliation ends", "internalServiceImport", internalSvcImportRef, "latency", latency)
	}()

	// Retrieve the InternalServiceImport object.
	internalSvcImport := &fleetnetv1alpha1.InternalServiceImport{}
	if err := r.HubClient.Get(ctx, req.NamespacedName, internalSvcImport); err != nil {
		// Skip the reconciliation if the InternalServiceImport does not exist.
		if errors.IsNotFound(err) {
			klog.V(4).InfoS("Ignoring NotFound internalServiceImport", "internalServiceImport", internalSvcImportRef)
			return ctrl.Result{}, nil
		}
		klog.ErrorS(err, "Failed to get internalServiceImport", "internalServiceImport", internalSvcImportRef)
		return ctrl.Result{}, err
	}

	// Check if the ServiceImport exists.
	svcNS := internalSvcImport.Spec.ServiceImportReference.Namespace
	svcName := internalSvcImport.Spec.ServiceImportReference.Name
	svcImportKey := types.NamespacedName{Namespace: svcNS, Name: svcName}
	svcImportRef := klog.KRef(svcNS, svcName)

	klog.V(2).InfoS("Check if the Service can be imported", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef)
	svcImport := &fleetnetv1alpha1.ServiceImport{}
	err := r.HubClient.Get(ctx, svcImportKey, svcImport)
	switch {
	case err != nil && errors.IsNotFound(err):
		// The ServiceImport does not exist, and the Service will not be imported. If a Service spec has been
		// added to InternalServiceImport status, it will be cleared; the InternalServiceImport cleanup finalizer will
		// be removed as well (if applicable).
		klog.V(2).InfoS("ServiceImport does not exist; spec of imported Service (if any) will be cleared",
			"serviceImport", svcImportRef,
			"internalServiceImport", internalSvcImportRef)
		return r.clearInternalServiceImportStatus(ctx, internalSvcImport)
	case err != nil:
		// An unexpected error occurred.
		klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef)
		return ctrl.Result{}, err
	case svcImport.DeletionTimestamp == nil && len(svcImport.Status.Clusters) == 0:
		// The ServiceImport is being processed; requeue the InternalServiceImport for later processing.
		klog.V(2).InfoS("ServiceImport is being processed; requeue for later processing",
			"serviceImport", svcImportRef,
			"internalServiceImport", internalSvcImportRef)
		return ctrl.Result{RequeueAfter: internalSvcImportRetryInterval}, nil
	}

	// Withdraw Service import request if the InternalServiceImport has been marked for deletion, or if the
	// ServceImport has been marked for deletion.
	if internalSvcImport.DeletionTimestamp != nil || svcImport.DeletionTimestamp != nil {
		if controllerutil.ContainsFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) {
			klog.V(2).InfoS("InternalServiceImport is deleted; withdraw the Service import request",
				"internalServiceImport", internalSvcImportRef)
			return r.withdrawServiceImport(ctx, svcImport, internalSvcImport)
		}
		// The absence of the InternalServiceImport cleanup finalizer guarantees that no attempt has been made to
		// import the Service to the member cluster, and as a result, no action is needed on this controller's end.
		return ctrl.Result{}, nil
	}

	// The cluster namespace and ID of the member cluster which attempts to import the Service.
	clusterNamespace := fleetnetv1alpha1.ClusterNamespace(internalSvcImport.Namespace)
	clusterID := fleetnetv1alpha1.ClusterID(internalSvcImport.Spec.ServiceImportReference.ClusterID)

	// Find out which member clusters have imported the Service.
	svcInUseBy := extractServiceInUseByInfoFromServiceImport(svcImport)
	if len(svcInUseBy.MemberClusters) > 0 {
		if _, ok := svcInUseBy.MemberClusters[clusterNamespace]; ok {
			// The current member cluster has already imported Service; fulfill the import (i.e. update the Service
			// spec kept in InternalServiceImport status).
			klog.V(2).InfoS("The member cluster has imported the Service; will sync the imported Service spec",
				"serviceImport", svcImportRef,
				"internalServiceImport", internalSvcImportRef)
			if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil {
				klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status",
					"serviceImport", svcImportRef,
					"internalServiceImport", internalSvcImportRef)
				return ctrl.Result{}, err
			}
			return ctrl.Result{}, nil
		}
		// Another member cluster has already imported the Service; at this moment, it is required that one Service
		// can only be imported exactly once across the whole fleet, and as a result, attempt to import the Service
		// by the current member cluster will be aborted.
		klog.V(2).InfoS("A member cluster has already imported the Service",
			"internalServiceImport", internalSvcImportRef,
			"serviceInUseBy", svcInUseBy)
		return r.clearInternalServiceImportStatus(ctx, internalSvcImport)
	}

	klog.V(2).InfoS("The Service can be imported; will sync the Service spec",
		"serviceImport", svcImportRef,
		"internalServiceImport", internalSvcImportRef)
	// Add cleanup finalizer to InternalServiceImport. This must happen before an attempt to import a Service
	// is fulfilled.
	if err := r.addInternalServiceImportCleanupFinalizer(ctx, internalSvcImport); err != nil {
		klog.ErrorS(err, "Failed to add cleanup finalizer to InternalServiceImport", "internalServiceImport", internalSvcImportRef)
		return ctrl.Result{}, err
	}

	// Update the ServiceInUseBy annotation, which claims the Service for the current member cluster to import.
	svcInUseBy.MemberClusters[clusterNamespace] = clusterID
	if err := r.annotateServiceImportWithServiceInUseByInfo(ctx, svcImport, svcInUseBy); err != nil {
		klog.ErrorS(err, "Failed to annotate ServiceImport with ServiceInUseBy info",
			"serviceImport", svcImportRef,
			"serviceInUseBy", svcInUseBy)
		return ctrl.Result{}, err
	}

	// Fulfill the import (i.e. update the Service spec kept in InternalServiceImport status).
	if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil {
		klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status",
			"serviceImport", svcImportRef,
			"internalServiceImport", internalSvcImportRef)
		return ctrl.Result{}, err
	}
	return ctrl.Result{}, nil
}