in pkg/controllers/hub/internalserviceimport/controller.go [49:170]
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
internalSvcImportRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "internalServiceImport", internalSvcImportRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "internalServiceImport", internalSvcImportRef, "latency", latency)
}()
// Retrieve the InternalServiceImport object.
internalSvcImport := &fleetnetv1alpha1.InternalServiceImport{}
if err := r.HubClient.Get(ctx, req.NamespacedName, internalSvcImport); err != nil {
// Skip the reconciliation if the InternalServiceImport does not exist.
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound internalServiceImport", "internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get internalServiceImport", "internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, err
}
// Check if the ServiceImport exists.
svcNS := internalSvcImport.Spec.ServiceImportReference.Namespace
svcName := internalSvcImport.Spec.ServiceImportReference.Name
svcImportKey := types.NamespacedName{Namespace: svcNS, Name: svcName}
svcImportRef := klog.KRef(svcNS, svcName)
klog.V(2).InfoS("Check if the Service can be imported", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef)
svcImport := &fleetnetv1alpha1.ServiceImport{}
err := r.HubClient.Get(ctx, svcImportKey, svcImport)
switch {
case err != nil && errors.IsNotFound(err):
// The ServiceImport does not exist, and the Service will not be imported. If a Service spec has been
// added to InternalServiceImport status, it will be cleared; the InternalServiceImport cleanup finalizer will
// be removed as well (if applicable).
klog.V(2).InfoS("ServiceImport does not exist; spec of imported Service (if any) will be cleared",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
return r.clearInternalServiceImportStatus(ctx, internalSvcImport)
case err != nil:
// An unexpected error occurred.
klog.ErrorS(err, "Failed to get ServiceImport", "serviceImport", svcImportRef, "internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, err
case svcImport.DeletionTimestamp == nil && len(svcImport.Status.Clusters) == 0:
// The ServiceImport is being processed; requeue the InternalServiceImport for later processing.
klog.V(2).InfoS("ServiceImport is being processed; requeue for later processing",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
return ctrl.Result{RequeueAfter: internalSvcImportRetryInterval}, nil
}
// Withdraw Service import request if the InternalServiceImport has been marked for deletion, or if the
// ServceImport has been marked for deletion.
if internalSvcImport.DeletionTimestamp != nil || svcImport.DeletionTimestamp != nil {
if controllerutil.ContainsFinalizer(internalSvcImport, internalSvcImportCleanupFinalizer) {
klog.V(2).InfoS("InternalServiceImport is deleted; withdraw the Service import request",
"internalServiceImport", internalSvcImportRef)
return r.withdrawServiceImport(ctx, svcImport, internalSvcImport)
}
// The absence of the InternalServiceImport cleanup finalizer guarantees that no attempt has been made to
// import the Service to the member cluster, and as a result, no action is needed on this controller's end.
return ctrl.Result{}, nil
}
// The cluster namespace and ID of the member cluster which attempts to import the Service.
clusterNamespace := fleetnetv1alpha1.ClusterNamespace(internalSvcImport.Namespace)
clusterID := fleetnetv1alpha1.ClusterID(internalSvcImport.Spec.ServiceImportReference.ClusterID)
// Find out which member clusters have imported the Service.
svcInUseBy := extractServiceInUseByInfoFromServiceImport(svcImport)
if len(svcInUseBy.MemberClusters) > 0 {
if _, ok := svcInUseBy.MemberClusters[clusterNamespace]; ok {
// The current member cluster has already imported Service; fulfill the import (i.e. update the Service
// spec kept in InternalServiceImport status).
klog.V(2).InfoS("The member cluster has imported the Service; will sync the imported Service spec",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil {
klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// Another member cluster has already imported the Service; at this moment, it is required that one Service
// can only be imported exactly once across the whole fleet, and as a result, attempt to import the Service
// by the current member cluster will be aborted.
klog.V(2).InfoS("A member cluster has already imported the Service",
"internalServiceImport", internalSvcImportRef,
"serviceInUseBy", svcInUseBy)
return r.clearInternalServiceImportStatus(ctx, internalSvcImport)
}
klog.V(2).InfoS("The Service can be imported; will sync the Service spec",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
// Add cleanup finalizer to InternalServiceImport. This must happen before an attempt to import a Service
// is fulfilled.
if err := r.addInternalServiceImportCleanupFinalizer(ctx, internalSvcImport); err != nil {
klog.ErrorS(err, "Failed to add cleanup finalizer to InternalServiceImport", "internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, err
}
// Update the ServiceInUseBy annotation, which claims the Service for the current member cluster to import.
svcInUseBy.MemberClusters[clusterNamespace] = clusterID
if err := r.annotateServiceImportWithServiceInUseByInfo(ctx, svcImport, svcInUseBy); err != nil {
klog.ErrorS(err, "Failed to annotate ServiceImport with ServiceInUseBy info",
"serviceImport", svcImportRef,
"serviceInUseBy", svcInUseBy)
return ctrl.Result{}, err
}
// Fulfill the import (i.e. update the Service spec kept in InternalServiceImport status).
if err := r.fulfillInternalServiceImport(ctx, svcImport, internalSvcImport); err != nil {
klog.ErrorS(err, "Failed to fulfill service import by updating InternalServiceImport status",
"serviceImport", svcImportRef,
"internalServiceImport", internalSvcImportRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}