pkg/controllers/member/internalserviceexport/controller.go (165 lines of code) (raw):

/* Copyright (c) Microsoft Corporation. Licensed under the MIT license. */ // package internalserviceexport features the InternalServiceExport controller for reporting back conflict resolution // status from the fleet to a member cluster. package internalserviceexport import ( "context" "reflect" "time" "github.com/prometheus/client_golang/prometheus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1" "go.goms.io/fleet-networking/pkg/common/metrics" ) const ( // ControllerName is the name of the Reconciler. ControllerName = "internalserviceexport-controller" ) var ( // svcExportDuration is a Prometheus histogram metric bundle that measures that time it takes for // Fleet networking controllers to export a valid Service with no conflicts. That is, the // stopwatch starts when the ServiceExport controller marks a Service (via the ServiceExportValid // condition) as valid for export, and stops when the InternalServiceExport controller // reports back (via the ServiceExportConflict condition) from the hub cluster that no spec // conflict has been found with the conflict. // // Note that this measurement does not cover the time spent for Fleet networking controllers to // pick up Service or ServiceExport spec changes, as at this moment there is no way in Kubernetes API // for controllers to reliably track it. svcExportDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.MetricsNamespace, Subsystem: metrics.MetricsSubsystem, Name: "service_export_duration_milliseconds", Help: "The duration of a service export", Buckets: metrics.ExportDurationMillisecondsBuckets, }, []string{ // The ID of the origin cluster, which exports the Service. "origin_cluster_id", }, ) ) func init() { // Register svcExportDuration (fleet_networking_service_export_duration_milliseconds) metric // with the controller runtime global metrics registry. ctrlmetrics.Registry.MustRegister(svcExportDuration) } // Reconciler reconciles the update of an InternalServiceExport. type Reconciler struct { MemberClusterID string MemberClient client.Client HubClient client.Client Recorder record.EventRecorder } //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports/status,verbs=get;update;patch //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports/status,verbs=get;update;patch //+kubebuilder:rbac:groups="",resources=events,verbs=create;patch // Reconcile reports back whether an export of a Service has been accepted with no conflict detected. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { internalSvcExportRef := klog.KRef(req.Namespace, req.Name) startTime := time.Now() klog.V(2).InfoS("Reconciliation starts", "internalServiceExport", internalSvcExportRef) defer func() { latency := time.Since(startTime).Milliseconds() klog.V(2).InfoS("Reconciliation ends", "internalServiceExport", internalSvcExportRef, "latency", latency) }() // Retrieve the InternalServiceExport object. var internalSvcExport fleetnetv1alpha1.InternalServiceExport if err := r.HubClient.Get(ctx, req.NamespacedName, &internalSvcExport); err != nil { // Skip the reconciliation if the InternalServiceExport does not exist. if errors.IsNotFound(err) { klog.V(4).InfoS("Ignoring NotFound internalServiceExport", "internalServiceExport", internalSvcExportRef) return ctrl.Result{}, nil } klog.ErrorS(err, "Failed to get internal svc export", "internalServiceExport", internalSvcExportRef) return ctrl.Result{}, err } if !internalSvcExport.ObjectMeta.DeletionTimestamp.IsZero() { // Skip the reconciliation if the InternalServiceExport is being deleted. // There is no need to report the conflicts back. // For example, the serviceExport is no longer valid or valid with 0 weight. // In these cases, there is no need to create internalServiceExport. klog.V(2).InfoS("Ignoring deleting internalServiceExport", "internalServiceExport", internalSvcExportRef) return ctrl.Result{}, nil } // Check if the exported Service exists. svcNS := internalSvcExport.Spec.ServiceReference.Namespace svcName := internalSvcExport.Spec.ServiceReference.Name svcExportRef := klog.KRef(svcNS, svcName) var svcExport fleetnetv1alpha1.ServiceExport err := r.MemberClient.Get(ctx, types.NamespacedName{Namespace: svcNS, Name: svcName}, &svcExport) switch { case errors.IsNotFound(err): // The absence of ServiceExport suggests that the Service should not be, yet has been, exported. Normally // this situation will never happen as the ServiceExport controller guarantees, using the cleanup finalizer, // that a ServiceExport will only be deleted after the Service has been unexported. In some corner cases, // however, e.g. the user chooses to remove the finalizer explicitly, a Service can be left over in the hub // cluster, and it is up to this controller to remove it. klog.V(2).InfoS("Svc export does not exist; delete the internal svc export", "serviceExport", svcExportRef, "internalServiceExport", internalSvcExportRef, ) if err := r.HubClient.Delete(ctx, &internalSvcExport); err != nil { klog.ErrorS(err, "Failed to delete internal svc export", "internalServiceExport", internalSvcExportRef) return ctrl.Result{}, err } return ctrl.Result{}, nil case err != nil: // An unexpected error occurs. klog.ErrorS(err, "Failed to get svc export", "serviceExport", svcExportRef) return ctrl.Result{}, err } // Report back conflict resolution result. klog.V(4).InfoS("Report back conflict resolution result", "internalServiceExport", internalSvcExportRef) reported, err := r.reportBackConflictCondition(ctx, &svcExport, &internalSvcExport) if err != nil { klog.ErrorS(err, "Failed to report back conflict resolution result", "serviceExport", svcExportRef) return ctrl.Result{}, err } // Observe a data point for the svcExportDuration metric. // Note that an observation happens only when there is a conflict resolution result to report back. if reported { if err := r.observeMetrics(ctx, &internalSvcExport, time.Now()); err != nil { klog.ErrorS(err, "Failed to observe metrics", "internalServiceExport", internalSvcExportRef) return ctrl.Result{}, err } } return ctrl.Result{}, nil } // SetupWithManager builds a controller with InternalSvcExportReconciler and sets it up with a // (multi-namespaced) controller manager. func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr).For(&fleetnetv1alpha1.InternalServiceExport{}).Complete(r) } // reportBackConflictCond reports the ServiceExportConflict condition added to the InternalServiceExport object in the // hub cluster back to the ServiceExport ojbect in the member cluster. // It returns a bool value, reported, to signify whether a report-back has been completed. func (r *Reconciler) reportBackConflictCondition(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport, internalSvcExport *fleetnetv1alpha1.InternalServiceExport) (reported bool, err error) { internalSvcExportRef := klog.KRef(internalSvcExport.Namespace, internalSvcExport.Name) internalSvcExportConflictCond := meta.FindStatusCondition(internalSvcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportConflict)) if internalSvcExportConflictCond == nil { // No conflict condition to report back; this is the expected behavior when the conflict resolution process // has not completed yet. klog.V(4).InfoS("No conflict condition to report back", "internalServiceExport", internalSvcExportRef) return false, nil } // Ideally, internalServiceExport needs to track both service and serviceExport generation. // But the service generation won't be populated by the controller. desiredSvcExportConflictCond := internalSvcExportConflictCond.DeepCopy() desiredSvcExportConflictCond.ObservedGeneration = svcExport.Generation svcExportConflictCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportConflict)) if reflect.DeepEqual(internalSvcExportConflictCond, svcExportConflictCond) { // The conflict condition has not changed and there is no need to report back; this is also an expected // behavior. klog.V(4).InfoS("No update on the conflict condition", "internalServiceExport", internalSvcExportRef) // Return true here to allow following steps to run again upon retries. return true, nil } // Update the conditions if internalSvcExportConflictCond.Status == metav1.ConditionTrue { r.Recorder.Eventf(svcExport, corev1.EventTypeWarning, "ServiceExportConflictFound", "Service %s is in conflict with other exported services", svcExport.Name) } if internalSvcExportConflictCond.Status == metav1.ConditionFalse { r.Recorder.Eventf(svcExport, corev1.EventTypeNormal, "NoServiceExportConflictFound", "Service %s is exported without conflict", svcExport.Name) } meta.SetStatusCondition(&svcExport.Status.Conditions, *desiredSvcExportConflictCond) return true, r.MemberClient.Status().Update(ctx, svcExport) } // Observe data points for metrics. func (r *Reconciler) observeMetrics(ctx context.Context, internalSvcExport *fleetnetv1alpha1.InternalServiceExport, startTime time.Time) error { // Check if a metric data point has been observed for the current resource version of the object; this helps guard // against repeated observation of metric data points for the same resource version of an object due to no-op // reconciliations (e.g. resyncs, untracked changes). lastObservedResourceVersion, ok := internalSvcExport.Annotations[metrics.MetricsAnnotationLastObservedResourceVersion] currentResourceVersion := internalSvcExport.Spec.ServiceReference.ResourceVersion if ok && lastObservedResourceVersion == currentResourceVersion { // A data point has been observed for this resource version; skip the observation. return nil } // Observe a new data point. // Annotate the object to track the last observed resource version; this must happen before the actual observation. if internalSvcExport.Annotations == nil { // Initialize the annotation map if it is empty. internalSvcExport.Annotations = map[string]string{} } internalSvcExport.Annotations[metrics.MetricsAnnotationLastObservedResourceVersion] = currentResourceVersion if err := r.HubClient.Update(ctx, internalSvcExport); err != nil { return err } // Skip the observation if the exportedSince field is empty in the object reference. // Note that in most cases this branch should never run as the Fleet networking controllers will always assign a // timestamp for each exported object. if internalSvcExport.Spec.ServiceReference.ExportedSince.IsZero() { klog.V(4).InfoS("exportedSince timestamp is absent; service export duration data point is not collected", "internalServiceExport", klog.KObj(internalSvcExport)) return nil } timeSpent := startTime.Sub(internalSvcExport.Spec.ServiceReference.ExportedSince.Time).Milliseconds() // Under some rare circumstances (such as user manipulating the timestamps; note that for this specific metric // clock drifts are less of an issue as all timestamps are from the same local lock), it could // happen that the valid timestamp of an ServiceExport appears later than its conflict resolution timestamp. // To avoid negative outliers affecting data analysis, this controller assigns a constant of exactly 1 second // when the calculated duration does not make sense. if timeSpent <= 0 { timeSpent = time.Second.Milliseconds() * 1 klog.V(4).InfoS("A negative service export duration data point has been observed", "serviceNamespacedName", internalSvcExport.Spec.ServiceReference.NamespacedName, "originClusterID", internalSvcExport.Spec.ServiceReference.ClusterID) } // Similarly, to avoid large outliers skewing the stats (e.g. averages), this controller caps the data point // to a constant value. if timeSpent > int64(metrics.ExportDurationRightBound) { timeSpent = int64(metrics.ExportDurationRightBound) } svcExportDuration.WithLabelValues(r.MemberClusterID).Observe(float64(timeSpent)) // TO-DO (chenyu1): Remove the metric logs when histogram metrics are supported in the backend. klog.V(2).InfoS("serviceExportDurationMilliseconds", "value", timeSpent, "originClusterID", r.MemberClusterID) return nil }