in pkg/controllers/hub/serviceimport/controller.go [59:173]
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
serviceImportKRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "serviceImport", serviceImportKRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "serviceImport", serviceImportKRef, "latency", latency)
}()
serviceImport := fleetnetv1alpha1.ServiceImport{}
if err := r.Client.Get(ctx, req.NamespacedName, &serviceImport); err != nil {
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
// If the spec has already present, no need to resolve the service spec.
if len(serviceImport.Status.Clusters) != 0 {
klog.V(4).InfoS("Already resolved the service spec and skipping", "serviceImport", serviceImportKRef)
return ctrl.Result{}, nil
}
internalServiceExportList := &fleetnetv1alpha1.InternalServiceExportList{}
namespaceName := types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}
listOpts := client.MatchingFields{
exportedServiceFieldNamespacedName: namespaceName.String(),
}
if err := r.Client.List(ctx, internalServiceExportList, &listOpts); err != nil {
klog.ErrorS(err, "Failed to list internalServiceExports used by the serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
if len(internalServiceExportList.Items) == 0 {
klog.V(2).InfoS("No internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
return r.deleteServiceImport(ctx, &serviceImport)
}
change := statusChange{
conflict: []*fleetnetv1alpha1.InternalServiceExport{},
noConflict: []*fleetnetv1alpha1.InternalServiceExport{},
}
var resolvedPortsSpec *[]fleetnetv1alpha1.ServicePort
for i := range internalServiceExportList.Items {
v := internalServiceExportList.Items[i]
if v.DeletionTimestamp != nil { // skip if the resource is in the deleting state
klog.V(4).InfoS("Skipping the internalServiceExport which is in the deleting state", serviceImport, serviceImportKRef, "internalServiceExport", klog.KObj(&v))
continue
}
// skip if the resource is just added which has not been handled by the internalServiceExport controller yet
if !controllerutil.ContainsFinalizer(&v, objectmeta.InternalServiceExportFinalizer) {
klog.V(3).InfoS("Skipping the internalServiceExport because of missing finalizer", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(&v))
continue
}
if resolvedPortsSpec == nil {
// pick the first internalServiceExport spec
resolvedPortsSpec = &v.Spec.Ports
}
// TODO: ideally we should ignore the order when comparing the serviceImports; port and protocol are the key.
if !equality.Semantic.DeepEqual(*resolvedPortsSpec, v.Spec.Ports) {
change.conflict = append(change.conflict, &v)
continue
}
change.noConflict = append(change.noConflict, &v)
}
if resolvedPortsSpec == nil {
// All of internalServicesExports are in the deleting state or waiting for the internalserviceexport controller to process it.
// We could safely delete the serviceImport if exists.
// When the internalserviceexport controller starts processing the object, it will create the serviceImport at
// that time.
klog.V(2).InfoS("No valid internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
return r.deleteServiceImport(ctx, &serviceImport)
}
// To reduce reconcile failure, we'll keep retry until it succeeds.
clusters := make([]fleetnetv1alpha1.ClusterStatus, 0, len(change.noConflict))
for _, v := range change.noConflict {
klog.V(3).InfoS("Marking internalServiceExport status as nonConflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
if err := r.updateInternalServiceExportWithRetry(ctx, v, false); err != nil {
if errors.IsNotFound(err) { // ignore deleted internalServiceExport
continue
}
return ctrl.Result{}, err
}
clusters = append(clusters, fleetnetv1alpha1.ClusterStatus{Cluster: v.Spec.ServiceReference.ClusterID})
}
if len(clusters) == 0 {
// At that time, all of internalServiceExports has been deleted.
// need to redo the Reconcile to pick new ports spec
klog.V(2).InfoS("Requeue the request to resolve the spec", "serviceImport", serviceImportKRef)
return ctrl.Result{Requeue: true}, nil
}
for _, v := range change.conflict {
klog.V(3).InfoS("Marking internalServiceExport status as Conflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
if err := r.updateInternalServiceExportWithRetry(ctx, v, true); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
serviceImport.Status = fleetnetv1alpha1.ServiceImportStatus{
Ports: *resolvedPortsSpec,
Clusters: clusters,
Type: fleetnetv1alpha1.ClusterSetIP, // may support headless in the future
}
updateFunc := func() error {
return r.Status().Update(ctx, &serviceImport)
}
klog.V(2).InfoS("Updating the serviceImport status", "serviceImport", serviceImportKRef)
if err := apiretry.Do(updateFunc); err != nil {
klog.ErrorS(err, "Failed to update serviceImport status with retry", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
r.Recorder.Eventf(&serviceImport, corev1.EventTypeNormal, "SuccessfulUpdateStatus", "Resolved exported service properties and updated %s status", serviceImport.Name)
return ctrl.Result{}, nil
}