pkg/controllers/member/serviceexport/controller.go (418 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
// Package serviceexport features the ServiceExport controller for exporting a Service from a member cluster to
// its fleet.
package serviceexport
import (
"context"
"errors"
"fmt"
"strings"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/network/armnetwork/v4"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/cloud-provider-azure/pkg/azclient/publicipaddressclient"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"go.goms.io/fleet/pkg/utils/controller"
fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/condition"
"go.goms.io/fleet-networking/pkg/common/metrics"
"go.goms.io/fleet-networking/pkg/common/objectmeta"
)
const (
svcExportValidCondReason = "ServiceIsValid"
svcExportInvalidNotFoundCondReason = "ServiceNotFound"
svcExportInvalidIneligibleCondReason = "ServiceIneligible"
svcExportPendingConflictResolutionReason = "ServicePendingConflictResolution"
svcExportInvalidWeightAnnotationReason = "ServiceExportInvalidWeightAnnotation"
// svcExportCleanupFinalizer is the finalizer ServiceExport controllers adds to mark that
// a ServiceExport can only be deleted after its corresponding Service has been unexported from the hub cluster.
svcExportCleanupFinalizer = "networking.fleet.azure.com/svc-export-cleanup"
// ControllerName is the name of the Reconciler.
ControllerName = "serviceexport-controller"
)
// Reconciler reconciles the export of a Service.
type Reconciler struct {
MemberClusterID string
MemberClient client.Client
HubClient client.Client
// The namespace reserved for the current member cluster in the hub cluster.
HubNamespace string
Recorder record.EventRecorder
ResourceGroupName string // default resource group name to create public IP address
AzurePublicIPAddressClient publicipaddressclient.Interface
EnableTrafficManagerFeature bool
}
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceexports/finalizers,verbs=update
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// Reconcile exports a Service.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
svcRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "service", svcRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "service", svcRef, "latency", latency)
}()
// Retrieve the ServiceExport object.
var svcExport fleetnetv1alpha1.ServiceExport
if err := r.MemberClient.Get(ctx, req.NamespacedName, &svcExport); err != nil {
if apierrors.IsNotFound(err) {
// Skip the reconciliation if the ServiceExport does not exist; this happens when the controller detects
// changes in a Service that has not been exported yet, or when a ServiceExport is deleted before the
// corresponding Service is exported to the fleet (and a cleanup finalizer is added). Either case requires
// no action on this controller's end.
klog.V(2).InfoS("Service export is not found", "service", svcRef)
return ctrl.Result{}, nil
}
// An error has occurred when getting the ServiceExport.
klog.ErrorS(err, "Failed to get service export", "service", svcRef)
return ctrl.Result{}, err
}
// Check if the ServiceExport has been deleted and needs cleanup (unexporting Service).
// A ServiceExport needs cleanup when it has the ServiceExport cleanup finalizer added; the absence of this
// finalizer guarantees that the corresponding Service has never been exported to the fleet, thus no action
// is needed.
if svcExport.DeletionTimestamp != nil {
if controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
klog.V(2).InfoS("Service export is deleted; unexport the service", "service", svcRef)
res, err := r.unexportService(ctx, &svcExport)
if err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
}
return res, err
}
return ctrl.Result{}, nil
}
// Check if the Service to export exists.
svc := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: req.Namespace,
Name: req.Name,
},
}
err := r.MemberClient.Get(ctx, req.NamespacedName, &svc)
switch {
// The Service to export does not exist or has been deleted.
case apierrors.IsNotFound(err) || svc.DeletionTimestamp != nil:
r.Recorder.Eventf(&svcExport, corev1.EventTypeWarning, "ServiceNotFound", "Service %s is not found or in the deleting state", svc.Name)
// Unexport the Service if the ServiceExport has the cleanup finalizer added.
klog.V(2).InfoS("Service is deleted; unexport the service", "service", svcRef)
if controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
if _, err = r.unexportService(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
return ctrl.Result{}, err
}
}
// Mark the ServiceExport as invalid.
klog.V(2).InfoS("Mark service export as invalid (service not found)", "service", svcRef)
if err := r.markServiceExportAsInvalidNotFound(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to mark service export as invalid (service not found)", "service", svcRef)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
// An unexpected error occurs when retrieving the Service.
case err != nil:
klog.ErrorS(err, "Failed to get the service", "service", svcRef)
return ctrl.Result{}, err
}
// Check if the Service is eligible for export.
if !isServiceEligibleForExport(&svc) {
r.Recorder.Eventf(&svcExport, corev1.EventTypeWarning, "ServiceNotEligible", "Service %s is not eligible for exporting and please check service spec", svc.Name)
// Unexport ineligible Service if the ServiceExport has the cleanup finalizer added.
if controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
klog.V(2).InfoS("Service is ineligible; unexport the service", "service", svcRef)
if _, err = r.unexportService(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
return ctrl.Result{}, err
}
}
// Mark the ServiceExport as invalid.
klog.V(2).InfoS("Mark service export as invalid (service ineligible)", "service", svcRef)
err = r.markServiceExportAsInvalidSvcIneligible(ctx, &svcExport)
if err != nil {
klog.ErrorS(err, "Failed to mark service export as invalid (service ineligible)", "service", svcRef)
}
return ctrl.Result{}, err
}
// Get the weight from the serviceExport annotation and validate it.
exportWeight, err := objectmeta.ExtractWeightFromServiceExport(&svcExport)
if err != nil {
// Here we don't unexport the service as it will interrupt the current traffic.
// There is no need to requeue the error as the controller should be triggered when the user corrects the annotation.
klog.ErrorS(controller.NewUserError(err), "service export has invalid annotation weight", "service", svcRef)
curValidCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionFalse,
Reason: svcExportInvalidWeightAnnotationReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("serviceExport %s/%s has an invalid weight annotation, err = %s", svcExport.Namespace, svcExport.Name, err),
}
// We have to compare the message since we cannot rely on the object generation as annotation does not change generation.
if condition.EqualConditionWithMessage(curValidCond, &expectedValidCond) {
// no need to retry if the condition is already set
return ctrl.Result{}, nil
}
r.Recorder.Eventf(&svcExport, corev1.EventTypeWarning, svcExportInvalidWeightAnnotationReason, "ServiceExport %s has invalid weight value in the annotation", svc.Name)
meta.SetStatusCondition(&svcExport.Status.Conditions, expectedValidCond)
return ctrl.Result{}, r.MemberClient.Status().Update(ctx, &svcExport)
}
if exportWeight == 0 {
// The weight is 0, unexport the service.
klog.V(2).InfoS("Service has weight 0; unexport the service", "service", svcRef)
r.Recorder.Eventf(&svcExport, corev1.EventTypeNormal, "Service", "Service %s weight is set to 0", svc.Name)
if controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
if _, err = r.unexportService(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
return ctrl.Result{}, err
}
}
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionTrue,
Reason: svcExportValidCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("exported service %s/%s with 0 weight", svcExport.Namespace, svcExport.Name),
}
// Since the annotation won't change the generation, we compare the message here.
if condition.EqualConditionWithMessage(validCond, &expectedValidCond) {
// no need to retry if the condition is already set
return ctrl.Result{}, nil
}
meta.SetStatusCondition(&svcExport.Status.Conditions, expectedValidCond)
return ctrl.Result{}, r.MemberClient.Status().Update(ctx, &svcExport)
}
// Add the cleanup finalizer to the ServiceExport; this must happen before the Service is actually exported.
if !controllerutil.ContainsFinalizer(&svcExport, svcExportCleanupFinalizer) {
klog.V(2).InfoS("Add cleanup finalizer to service export", "service", svcRef)
if err := r.addServiceExportCleanupFinalizer(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to add cleanup finalizer to svc export", "service", svcRef)
return ctrl.Result{}, err
}
}
// Mark the ServiceExport as valid.
klog.V(2).InfoS("Mark service export as valid", "service", svcRef)
if err = r.markServiceExportAsValid(ctx, &svcExport); err != nil {
klog.ErrorS(err, "Failed to mark service export as valid", "service", svcRef)
return ctrl.Result{}, err
}
// Retrieve the last seen resource version and the last seen timestamp; these two values are used for metric collection.
// If the two values are not present or not valid, annotate ServiceExport with new values.
//
// Note that the two values are not tamperproof.
exportedSince, err := r.collectAndVerifyLastSeenResourceVersionAndTimestamp(ctx, &svc, &svcExport, startTime)
if err != nil {
klog.Warning("Failed to annotate last seen generation and timestamp", "serviceExport", svcRef)
}
// Export the Service or update the exported Service.
return r.exportService(ctx, &svcExport, &svc, exportedSince, exportWeight)
}
func (r *Reconciler) exportService(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport, svc *corev1.Service,
exportedSince time.Time, exportWeight int64) (ctrl.Result, error) {
svcRef := klog.KObj(svc)
// Create or update the InternalServiceExport object.
internalSvcExport := fleetnetv1alpha1.InternalServiceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.HubNamespace,
Name: formatInternalServiceExportName(svcExport),
},
}
svcExportPorts := extractServicePorts(svc)
klog.V(2).InfoS("Export the service or update the exported service",
"service", svcExport,
"internalServiceExport", klog.KObj(&internalSvcExport))
createOrUpdateOp, err := controllerutil.CreateOrUpdate(ctx, r.HubClient, &internalSvcExport, func() error {
if internalSvcExport.CreationTimestamp.IsZero() {
// Set the ServiceReference only when the InternalServiceExport is created; most of the fields in
// an ExportedObjectReference should be immutable.
internalSvcExport.Spec.ServiceReference = fleetnetv1alpha1.FromMetaObjects(r.MemberClusterID,
svc.TypeMeta, svc.ObjectMeta, metav1.NewTime(exportedSince))
}
// Return an error if an attempt is made to update an InternalServiceExport that references a different
// Service from the one that is being reconciled. This usually happens when a service is deleted and
// re-created immediately.
if internalSvcExport.Spec.ServiceReference.UID != svc.UID {
klog.V(2).InfoS("Failed to create/update internalServiceExport, UIDs mismatch",
"service", svcRef,
"internalServiceExport", klog.KObj(&internalSvcExport),
"newUID", svc.UID,
"oldUID", internalSvcExport.Spec.ServiceReference.UID)
// The AlreadyExists error returned here features a different GVR source (service, rather than
// internalServiceExport); such an error would never be yielded in the normal workflow.
return apierrors.NewAlreadyExists(
schema.GroupResource{Group: fleetnetv1alpha1.GroupVersion.Group, Resource: "Service"},
fmt.Sprintf("%s/%s", svc.Namespace, svc.Name),
)
}
internalSvcExport.Spec.Ports = svcExportPorts
internalSvcExport.Spec.ServiceReference.UpdateFromMetaObject(svc.ObjectMeta, metav1.NewTime(exportedSince))
if r.EnableTrafficManagerFeature {
klog.V(2).InfoS("Collecting Traffic Manager related information and set to the internal service export", "service", svcRef)
internalSvcExport.Spec.Weight = ptr.To(exportWeight)
if err := r.setAzureRelatedInformation(ctx, svc, &internalSvcExport); err != nil {
klog.ErrorS(err, "Failed to populate the Azure information for the Traffic Manager feature in the internal service export", "service", svcRef)
return err
}
}
return nil
})
statusErr := &apierrors.StatusError{}
ok := errors.As(err, &statusErr)
switch {
case apierrors.IsAlreadyExists(err) && ok && statusErr.Status().Details.Kind == "Service":
// An export with the same key but different UID already exists; unexport the Service first, and
// requeue a new attempt to export the Service.
// Additional checks are performed here as two forms of AlreadyExists error can be returned in the CreateOrUpdate
// call: it could be that an actual UID mismatch is found, however, since CreateOrUpdate is, in essence, a two-part op
// (the function first gets the object, and then decides whether to create the object or update it according to the get
// result), a racing condition may lead to an AlreadyExists error being yielded even if there is no UID mismatch at all.
// This can happen, albeit quite rarely, when the system is under heavy load, and the informers cannot sync caches
// fast enough; the out-of-date cache will return that an object does not exist when read, even though the object is
// already present in the persistent store, and any subsequent create call would fail.
if _, err := r.unexportService(ctx, svcExport); err != nil {
klog.ErrorS(err, "Failed to unexport the service", "service", svcRef)
return ctrl.Result{}, err
}
// Unexporting a Service removes the cleanup finalizer from the ServiceExport, which in normal cases
// will trigger another reconciliation loop automatically; for better clarity here the controller requests
// the new reconciliation attempt explicitly.
return ctrl.Result{Requeue: true}, nil
case err != nil:
klog.ErrorS(err, "Failed to create/update InternalServiceExport",
"internalServiceExport", klog.KObj(&internalSvcExport),
"service", svcRef,
"op", createOrUpdateOp)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *Reconciler) setAzureRelatedInformation(ctx context.Context,
service *corev1.Service,
hubSvcExport *fleetnetv1alpha1.InternalServiceExport) error {
hubSvcExport.Spec.Type = service.Spec.Type
if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
return nil
}
// The annotation value is case-sensitive.
// https://github.com/kubernetes-sigs/cloud-provider-azure/blob/release-1.31/pkg/provider/azure_loadbalancer.go#L3559
hubSvcExport.Spec.IsInternalLoadBalancer = service.Annotations[objectmeta.ServiceAnnotationAzureLoadBalancerInternal] == "true"
if hubSvcExport.Spec.IsInternalLoadBalancer {
// no need to populate the PublicIPResourceID and IsDNSLabelConfigured which are only applicable for external load balancer
return nil
}
serviceKObj := klog.KObj(service)
if len(service.Status.LoadBalancer.Ingress) == 0 {
// Assuming once the service status is updated, the controller will be triggered again.
klog.V(2).InfoS("The load balancer IP is not assigned yet", "service", serviceKObj)
return nil
}
if service.Status.LoadBalancer.Ingress[0].IP == "" {
err := errors.New("the service ingress is not nil but with empty IP")
klog.ErrorS(controller.NewUnexpectedBehaviorError(err), "Failed to get the load balancer IP from service", "service", serviceKObj, "status", service.Status)
return nil
}
pip, err := r.lookupPublicIPResourceIDByLoadBalancerIP(ctx, service)
if err != nil {
return err
}
if pip == nil {
klog.V(2).InfoS("The public IP is in the progressing", "service", serviceKObj, "ip", service.Status.LoadBalancer.Ingress[0].IP)
// Assuming once the service status is updated, the controller will be triggered again in instead of retrying here
// to avoid sending Azure requests.
return nil
}
hubSvcExport.Spec.PublicIPResourceID = pip.ID
// Note the user can set the dns label via the Azure portal or Azure CLI without updating service.
// This information may be stale as we don't monitor the public IP address resource.
hubSvcExport.Spec.IsDNSLabelConfigured = pip.Properties != nil && pip.Properties.DNSSettings != nil && pip.Properties.DNSSettings.DomainNameLabel != nil
// No matter if the customer bring your own IP or not, the cloud provider will reconcile the DNS label based on the
// DNS annotation.
dnsName, found := service.Annotations[objectmeta.ServiceAnnotationAzureDNSLabelName]
klog.V(2).InfoS("Finding whether the DNS is assigned", "service", serviceKObj, "dnsName", dnsName, "isSetOnService", found, "isConfiguredOnPIP", hubSvcExport.Spec.IsDNSLabelConfigured)
// If the annotation is not set, the cloud provider won't reconcile the DNS label and return the current status.
if !found {
// cloud provider won't delete DNS label on pip if the annotation is not set.
return nil
}
if len(dnsName) == 0 {
hubSvcExport.Spec.IsDNSLabelConfigured = false // cloud provider will delete the DNS label on the pip.
return nil
}
if !hubSvcExport.Spec.IsDNSLabelConfigured {
err = fmt.Errorf("in the process of adding DNS to the public ip address %s", *pip.ID)
klog.ErrorS(err, "Requeue the request to see if the DNS is ready or not", "service", serviceKObj)
return err
}
return nil
}
// TODO: can improve the performance by caching the public IP address resource ID.
// Note: we don't support "service.beta.kubernetes.io/azure-pip-prefix-id" annotation, and public ip cannot be found in
// this case.
func (r *Reconciler) lookupPublicIPResourceIDByLoadBalancerIP(ctx context.Context, service *corev1.Service) (*armnetwork.PublicIPAddress, error) {
// The customer can specify the resource group for the public IP address in the service annotation.
rg := strings.TrimSpace(service.Annotations[objectmeta.ServiceAnnotationLoadBalancerResourceGroup])
if len(rg) == 0 {
rg = r.ResourceGroupName
}
serviceKObj := klog.KObj(service)
pips, err := r.AzurePublicIPAddressClient.List(ctx, rg)
if err != nil {
klog.ErrorS(err, "Failed to list Azure public IP addresses", "service", serviceKObj, "resourceGroup", rg)
return nil, err
}
for _, pip := range pips {
if pip.Properties != nil && pip.Properties.IPAddress != nil &&
*pip.Properties.IPAddress == service.Status.LoadBalancer.Ingress[0].IP {
return pip, nil
}
}
klog.V(2).InfoS("The public IP address resource ID cannot be found in the public IP lists", "service", serviceKObj, "ip", service.Status.LoadBalancer.Ingress[0].IP, "resourceGroup", rg)
return nil, nil
}
// SetupWithManager builds a controller with Reconciler and sets it up with a controller manager.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
// The ServiceExport controller watches over ServiceExport objects.
For(&fleetnetv1alpha1.ServiceExport{}).
// The ServiceExport controller watches over Service objects.
Watches(&corev1.Service{}, &handler.EnqueueRequestForObject{}).
Complete(r)
}
// unexportService unexports a Service, specifically, it deletes the corresponding InternalServiceExport from the
// hub cluster and removes the cleanup finalizer.
func (r *Reconciler) unexportService(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) (ctrl.Result, error) {
// Get the unique name assigned when the Service is exported. it is guaranteed that Services are
// always exported using the name format `ORIGINAL_NAMESPACE-ORIGINAL_NAME`; for example, a Service
// from namespace `default`` with the name `store`` will be exported with the name `default-store`.
internalSvcExportName := formatInternalServiceExportName(svcExport)
internalSvcExport := &fleetnetv1alpha1.InternalServiceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.HubNamespace,
Name: internalSvcExportName,
},
}
// Unexport the Service.
if err := r.HubClient.Delete(ctx, internalSvcExport); err != nil && !apierrors.IsNotFound(err) {
// It is guaranteed that a finalizer is always added to a ServiceExport before the corresponding Service is
// actually exported; in some rare occasions, e.g. the controller crashes right after it adds the finalizer
// to the ServiceExport but before the it gets a chance to actually export the Service to the
// hub cluster, it could happen that a ServiceExport has a finalizer present yet the corresponding Service
// has not been exported to the hub cluster. It is an expected behavior and no action is needed on this
// controller's end.
return ctrl.Result{}, err
}
// Remove the finalizer from the ServiceExport; it must happen after the Service has been successfully unexported.
if err := r.removeServiceExportCleanupFinalizer(ctx, svcExport); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// removeServiceExportCleanupFinalizer removes the cleanup finalizer from a ServiceExport.
func (r *Reconciler) removeServiceExportCleanupFinalizer(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) error {
controllerutil.RemoveFinalizer(svcExport, svcExportCleanupFinalizer)
return r.MemberClient.Update(ctx, svcExport)
}
// markServiceExportAsInvalidNotFound marks a ServiceExport as invalid.
func (r *Reconciler) markServiceExportAsInvalidNotFound(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) error {
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := &metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionFalse,
Reason: svcExportInvalidNotFoundCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("service %s/%s is not found", svcExport.Namespace, svcExport.Name),
}
if condition.EqualCondition(validCond, expectedValidCond) {
// A stable state has been reached; no further action is needed.
return nil
}
meta.SetStatusCondition(&svcExport.Status.Conditions, *expectedValidCond)
return r.MemberClient.Status().Update(ctx, svcExport)
}
// markServiceExportAsInvalidSvcIneligible marks a ServiceExport as invalid.
func (r *Reconciler) markServiceExportAsInvalidSvcIneligible(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) error {
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := &metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionFalse,
Reason: svcExportInvalidIneligibleCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("service %s/%s is not eligible for export", svcExport.Namespace, svcExport.Name),
}
if condition.EqualCondition(validCond, expectedValidCond) {
// A stable state has been reached; no further action is needed.
return nil
}
meta.SetStatusCondition(&svcExport.Status.Conditions, *expectedValidCond)
return r.MemberClient.Status().Update(ctx, svcExport)
}
// addServiceExportCleanupFinalizer adds the cleanup finalizer to a ServiceExport.
func (r *Reconciler) addServiceExportCleanupFinalizer(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) error {
controllerutil.AddFinalizer(svcExport, svcExportCleanupFinalizer)
return r.MemberClient.Update(ctx, svcExport)
}
// markServiceExportAsValid marks a ServiceExport as valid; if no conflict condition has been added, the
// ServiceExport will be marked as pending conflict resolution as well.
func (r *Reconciler) markServiceExportAsValid(ctx context.Context, svcExport *fleetnetv1alpha1.ServiceExport) error {
needUpdateStatus := false
validCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportValid))
expectedValidCond := &metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportValid),
Status: metav1.ConditionTrue,
Reason: svcExportValidCondReason,
ObservedGeneration: svcExport.Generation,
Message: fmt.Sprintf("service %s/%s is valid for export", svcExport.Namespace, svcExport.Name),
}
// When weight annotation is changed, the serviceExport generation won't change.
// There are two kinds of messages for the valid condition:
// * valid with weight 0
// * valid with weight > 0
if !condition.EqualConditionWithMessage(validCond, expectedValidCond) {
meta.SetStatusCondition(&svcExport.Status.Conditions, *expectedValidCond)
needUpdateStatus = true
}
if conflictCond := meta.FindStatusCondition(svcExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportConflict)); conflictCond == nil {
meta.SetStatusCondition(&svcExport.Status.Conditions, metav1.Condition{
Type: string(fleetnetv1alpha1.ServiceExportConflict),
Status: metav1.ConditionUnknown,
ObservedGeneration: svcExport.Generation,
Reason: svcExportPendingConflictResolutionReason,
Message: fmt.Sprintf("service %s/%s is pending export conflict resolution", svcExport.Namespace, svcExport.Name),
})
needUpdateStatus = true
}
if !needUpdateStatus {
// A stable state has been reached; no further action is needed.
return nil
}
r.Recorder.Eventf(svcExport, corev1.EventTypeNormal, "ValidServiceExport", "Service %s is valid for export", svcExport.Name)
r.Recorder.Eventf(svcExport, corev1.EventTypeNormal, "PendingExportConflictResolution", "Service %s is pending export conflict resolution", svcExport.Name)
return r.MemberClient.Status().Update(ctx, svcExport)
}
// collectAndVerifyLastSeenResourceVersionAndTime collects and verifies the last seen resource version and timestamp annotations
// on ServiceExports; it will assign new values if the annotations are not present or not valid.
func (r *Reconciler) collectAndVerifyLastSeenResourceVersionAndTimestamp(ctx context.Context,
svc *corev1.Service, svcExport *fleetnetv1alpha1.ServiceExport, startTime time.Time) (time.Time, error) {
// Check if the two annotations are present; assign new values if they are absent.
lastSeenResourceVersion, lastSeenResourceVersionOk := svcExport.Annotations[metrics.MetricsAnnotationLastSeenResourceVersion]
lastSeenTimestampData, lastSeenTimestampOk := svcExport.Annotations[metrics.MetricsAnnotationLastSeenTimestamp]
if !lastSeenResourceVersionOk || !lastSeenTimestampOk {
return startTime, r.annotateLastSeenResourceVersionAndTimestamp(ctx, svc, svcExport, startTime)
}
lastSeenTimestamp, lastSeenTimestampErr := time.Parse(metrics.MetricsLastSeenTimestampFormat, lastSeenTimestampData)
if lastSeenTimestampErr != nil {
return startTime, r.annotateLastSeenResourceVersionAndTimestamp(ctx, svc, svcExport, startTime)
}
if lastSeenResourceVersion != svc.ResourceVersion || lastSeenTimestamp.After(startTime) {
return startTime, r.annotateLastSeenResourceVersionAndTimestamp(ctx, svc, svcExport, startTime)
}
return lastSeenTimestamp, nil
}
// annotateLastSeenResourceVersionAndTimestamp annotates a ServiceExport with last seen resource version and timestamp.
func (r *Reconciler) annotateLastSeenResourceVersionAndTimestamp(ctx context.Context,
svc *corev1.Service, svcExport *fleetnetv1alpha1.ServiceExport, startTime time.Time) error {
// Initialize the annotation map if no annoation has been added yet.
if svcExport.Annotations == nil {
svcExport.Annotations = map[string]string{}
}
svcExport.Annotations[metrics.MetricsAnnotationLastSeenResourceVersion] = svc.ResourceVersion
svcExport.Annotations[metrics.MetricsAnnotationLastSeenTimestamp] = startTime.Format(metrics.MetricsLastSeenTimestampFormat)
return r.MemberClient.Update(ctx, svcExport)
}