func()

in pkg/controllers/hub/serviceimport/controller.go [59:173]


func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	serviceImportKRef := klog.KRef(req.Namespace, req.Name)
	startTime := time.Now()
	klog.V(2).InfoS("Reconciliation starts", "serviceImport", serviceImportKRef)
	defer func() {
		latency := time.Since(startTime).Milliseconds()
		klog.V(2).InfoS("Reconciliation ends", "serviceImport", serviceImportKRef, "latency", latency)
	}()
	serviceImport := fleetnetv1alpha1.ServiceImport{}
	if err := r.Client.Get(ctx, req.NamespacedName, &serviceImport); err != nil {
		if errors.IsNotFound(err) {
			klog.V(4).InfoS("Ignoring NotFound serviceImport", "serviceImport", serviceImportKRef)
			return ctrl.Result{}, nil
		}
		klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef)
		return ctrl.Result{}, err
	}
	// If the spec has already present, no need to resolve the service spec.
	if len(serviceImport.Status.Clusters) != 0 {
		klog.V(4).InfoS("Already resolved the service spec and skipping", "serviceImport", serviceImportKRef)
		return ctrl.Result{}, nil
	}

	internalServiceExportList := &fleetnetv1alpha1.InternalServiceExportList{}
	namespaceName := types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}
	listOpts := client.MatchingFields{
		exportedServiceFieldNamespacedName: namespaceName.String(),
	}
	if err := r.Client.List(ctx, internalServiceExportList, &listOpts); err != nil {
		klog.ErrorS(err, "Failed to list internalServiceExports used by the serviceImport", "serviceImport", serviceImportKRef)
		return ctrl.Result{}, err
	}
	if len(internalServiceExportList.Items) == 0 {
		klog.V(2).InfoS("No internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
		return r.deleteServiceImport(ctx, &serviceImport)
	}
	change := statusChange{
		conflict:   []*fleetnetv1alpha1.InternalServiceExport{},
		noConflict: []*fleetnetv1alpha1.InternalServiceExport{},
	}

	var resolvedPortsSpec *[]fleetnetv1alpha1.ServicePort
	for i := range internalServiceExportList.Items {
		v := internalServiceExportList.Items[i]
		if v.DeletionTimestamp != nil { // skip if the resource is in the deleting state
			klog.V(4).InfoS("Skipping the internalServiceExport which is in the deleting state", serviceImport, serviceImportKRef, "internalServiceExport", klog.KObj(&v))
			continue
		}
		// skip if the resource is just added which has not been handled by the internalServiceExport controller yet
		if !controllerutil.ContainsFinalizer(&v, objectmeta.InternalServiceExportFinalizer) {
			klog.V(3).InfoS("Skipping the internalServiceExport because of missing finalizer", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(&v))
			continue
		}

		if resolvedPortsSpec == nil {
			// pick the first internalServiceExport spec
			resolvedPortsSpec = &v.Spec.Ports
		}
		// TODO: ideally we should ignore the order when comparing the serviceImports; port and protocol are the key.
		if !equality.Semantic.DeepEqual(*resolvedPortsSpec, v.Spec.Ports) {
			change.conflict = append(change.conflict, &v)
			continue
		}
		change.noConflict = append(change.noConflict, &v)
	}

	if resolvedPortsSpec == nil {
		// All of internalServicesExports are in the deleting state or waiting for the internalserviceexport controller to process it.
		// We could safely delete the serviceImport if exists.
		// When the internalserviceexport controller starts processing the object, it will create the serviceImport at
		// that time.
		klog.V(2).InfoS("No valid internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
		return r.deleteServiceImport(ctx, &serviceImport)
	}

	// To reduce reconcile failure, we'll keep retry until it succeeds.
	clusters := make([]fleetnetv1alpha1.ClusterStatus, 0, len(change.noConflict))
	for _, v := range change.noConflict {
		klog.V(3).InfoS("Marking internalServiceExport status as nonConflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
		if err := r.updateInternalServiceExportWithRetry(ctx, v, false); err != nil {
			if errors.IsNotFound(err) { // ignore deleted internalServiceExport
				continue
			}
			return ctrl.Result{}, err
		}
		clusters = append(clusters, fleetnetv1alpha1.ClusterStatus{Cluster: v.Spec.ServiceReference.ClusterID})
	}
	if len(clusters) == 0 {
		// At that time, all of internalServiceExports has been deleted.
		// need to redo the Reconcile to pick new ports spec
		klog.V(2).InfoS("Requeue the request to resolve the spec", "serviceImport", serviceImportKRef)
		return ctrl.Result{Requeue: true}, nil
	}
	for _, v := range change.conflict {
		klog.V(3).InfoS("Marking internalServiceExport status as Conflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
		if err := r.updateInternalServiceExportWithRetry(ctx, v, true); err != nil {
			return ctrl.Result{}, client.IgnoreNotFound(err)
		}
	}
	serviceImport.Status = fleetnetv1alpha1.ServiceImportStatus{
		Ports:    *resolvedPortsSpec,
		Clusters: clusters,
		Type:     fleetnetv1alpha1.ClusterSetIP, // may support headless in the future
	}
	updateFunc := func() error {
		return r.Status().Update(ctx, &serviceImport)
	}
	klog.V(2).InfoS("Updating the serviceImport status", "serviceImport", serviceImportKRef)
	if err := apiretry.Do(updateFunc); err != nil {
		klog.ErrorS(err, "Failed to update serviceImport status with retry", "serviceImport", serviceImportKRef)
		return ctrl.Result{}, err
	}
	r.Recorder.Eventf(&serviceImport, corev1.EventTypeNormal, "SuccessfulUpdateStatus", "Resolved exported service properties and updated %s status", serviceImport.Name)
	return ctrl.Result{}, nil
}