pkg/controllers/hub/serviceimport/controller.go (173 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
// Package serviceimport features the serviceimport controller to resolve the service spec when exporting multi-cluster
// services.
package serviceimport
import (
"context"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
fleetnetv1alpha1 "go.goms.io/fleet-networking/api/v1alpha1"
"go.goms.io/fleet-networking/pkg/common/apiretry"
"go.goms.io/fleet-networking/pkg/common/condition"
"go.goms.io/fleet-networking/pkg/common/objectmeta"
)
const (
// fields name used to filter resources
exportedServiceFieldNamespacedName = ".spec.serviceReference.namespacedName"
// ControllerName is the name of the Reconciler.
ControllerName = "serviceimport-controller"
)
// Reconciler reconciles a ServiceImport object.
type Reconciler struct {
client.Client
Recorder record.EventRecorder
}
// statusChange stores the internalServiceExports list whose status needs to be updated.
type statusChange struct {
conflict []*fleetnetv1alpha1.InternalServiceExport
noConflict []*fleetnetv1alpha1.InternalServiceExport
}
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports,verbs=get;list;watch;update;patch;delete
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=serviceimports/finalizers,verbs=update
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports,verbs=get;watch;list
//+kubebuilder:rbac:groups=networking.fleet.azure.com,resources=internalserviceexports/status,verbs=get;update;patch
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
// Reconcile resolves the service spec when the serviceImport status is empty and updates the status of internalServiceExports.
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
serviceImportKRef := klog.KRef(req.Namespace, req.Name)
startTime := time.Now()
klog.V(2).InfoS("Reconciliation starts", "serviceImport", serviceImportKRef)
defer func() {
latency := time.Since(startTime).Milliseconds()
klog.V(2).InfoS("Reconciliation ends", "serviceImport", serviceImportKRef, "latency", latency)
}()
serviceImport := fleetnetv1alpha1.ServiceImport{}
if err := r.Client.Get(ctx, req.NamespacedName, &serviceImport); err != nil {
if errors.IsNotFound(err) {
klog.V(4).InfoS("Ignoring NotFound serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, nil
}
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
// If the spec has already present, no need to resolve the service spec.
if len(serviceImport.Status.Clusters) != 0 {
klog.V(4).InfoS("Already resolved the service spec and skipping", "serviceImport", serviceImportKRef)
return ctrl.Result{}, nil
}
internalServiceExportList := &fleetnetv1alpha1.InternalServiceExportList{}
namespaceName := types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}
listOpts := client.MatchingFields{
exportedServiceFieldNamespacedName: namespaceName.String(),
}
if err := r.Client.List(ctx, internalServiceExportList, &listOpts); err != nil {
klog.ErrorS(err, "Failed to list internalServiceExports used by the serviceImport", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
if len(internalServiceExportList.Items) == 0 {
klog.V(2).InfoS("No internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
return r.deleteServiceImport(ctx, &serviceImport)
}
change := statusChange{
conflict: []*fleetnetv1alpha1.InternalServiceExport{},
noConflict: []*fleetnetv1alpha1.InternalServiceExport{},
}
var resolvedPortsSpec *[]fleetnetv1alpha1.ServicePort
for i := range internalServiceExportList.Items {
v := internalServiceExportList.Items[i]
if v.DeletionTimestamp != nil { // skip if the resource is in the deleting state
klog.V(4).InfoS("Skipping the internalServiceExport which is in the deleting state", serviceImport, serviceImportKRef, "internalServiceExport", klog.KObj(&v))
continue
}
// skip if the resource is just added which has not been handled by the internalServiceExport controller yet
if !controllerutil.ContainsFinalizer(&v, objectmeta.InternalServiceExportFinalizer) {
klog.V(3).InfoS("Skipping the internalServiceExport because of missing finalizer", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(&v))
continue
}
if resolvedPortsSpec == nil {
// pick the first internalServiceExport spec
resolvedPortsSpec = &v.Spec.Ports
}
// TODO: ideally we should ignore the order when comparing the serviceImports; port and protocol are the key.
if !equality.Semantic.DeepEqual(*resolvedPortsSpec, v.Spec.Ports) {
change.conflict = append(change.conflict, &v)
continue
}
change.noConflict = append(change.noConflict, &v)
}
if resolvedPortsSpec == nil {
// All of internalServicesExports are in the deleting state or waiting for the internalserviceexport controller to process it.
// We could safely delete the serviceImport if exists.
// When the internalserviceexport controller starts processing the object, it will create the serviceImport at
// that time.
klog.V(2).InfoS("No valid internalServiceExport found and deleting serviceImport", "serviceImport", serviceImportKRef)
return r.deleteServiceImport(ctx, &serviceImport)
}
// To reduce reconcile failure, we'll keep retry until it succeeds.
clusters := make([]fleetnetv1alpha1.ClusterStatus, 0, len(change.noConflict))
for _, v := range change.noConflict {
klog.V(3).InfoS("Marking internalServiceExport status as nonConflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
if err := r.updateInternalServiceExportWithRetry(ctx, v, false); err != nil {
if errors.IsNotFound(err) { // ignore deleted internalServiceExport
continue
}
return ctrl.Result{}, err
}
clusters = append(clusters, fleetnetv1alpha1.ClusterStatus{Cluster: v.Spec.ServiceReference.ClusterID})
}
if len(clusters) == 0 {
// At that time, all of internalServiceExports has been deleted.
// need to redo the Reconcile to pick new ports spec
klog.V(2).InfoS("Requeue the request to resolve the spec", "serviceImport", serviceImportKRef)
return ctrl.Result{Requeue: true}, nil
}
for _, v := range change.conflict {
klog.V(3).InfoS("Marking internalServiceExport status as Conflict", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(v))
if err := r.updateInternalServiceExportWithRetry(ctx, v, true); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
serviceImport.Status = fleetnetv1alpha1.ServiceImportStatus{
Ports: *resolvedPortsSpec,
Clusters: clusters,
Type: fleetnetv1alpha1.ClusterSetIP, // may support headless in the future
}
updateFunc := func() error {
return r.Status().Update(ctx, &serviceImport)
}
klog.V(2).InfoS("Updating the serviceImport status", "serviceImport", serviceImportKRef)
if err := apiretry.Do(updateFunc); err != nil {
klog.ErrorS(err, "Failed to update serviceImport status with retry", "serviceImport", serviceImportKRef)
return ctrl.Result{}, err
}
r.Recorder.Eventf(&serviceImport, corev1.EventTypeNormal, "SuccessfulUpdateStatus", "Resolved exported service properties and updated %s status", serviceImport.Name)
return ctrl.Result{}, nil
}
func (r *Reconciler) updateInternalServiceExportWithRetry(ctx context.Context, internalServiceExport *fleetnetv1alpha1.InternalServiceExport, conflict bool) error {
desiredCond := condition.UnconflictedServiceExportConflictCondition(*internalServiceExport)
if conflict {
desiredCond = condition.ConflictedServiceExportConflictCondition(*internalServiceExport)
}
currentCond := meta.FindStatusCondition(internalServiceExport.Status.Conditions, string(fleetnetv1alpha1.ServiceExportConflict))
if condition.EqualCondition(currentCond, &desiredCond) {
return nil
}
exportKObj := klog.KObj(internalServiceExport)
meta.SetStatusCondition(&internalServiceExport.Status.Conditions, desiredCond)
updateFunc := func() error {
return r.Client.Status().Update(ctx, internalServiceExport)
}
if err := apiretry.Do(updateFunc); err != nil {
klog.ErrorS(err, "Failed to update internalServiceExport status with retry", "internalServiceExport", exportKObj)
return err
}
return nil
}
func (r *Reconciler) deleteServiceImport(ctx context.Context, serviceImport *fleetnetv1alpha1.ServiceImport) (ctrl.Result, error) {
r.Recorder.Eventf(serviceImport, corev1.EventTypeNormal, "NoExportedService", "No exported service and deleting serviceImport %s", serviceImport.Name)
serviceImportKObj := klog.KObj(serviceImport)
if err := r.Client.Delete(ctx, serviceImport); err != nil {
klog.ErrorS(err, "Failed to delete serviceImport", "serviceImport", serviceImportKObj)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
klog.V(2).InfoS("There are no internalServiceExports and serviceImport has been deleted", "serviceImport", serviceImportKObj)
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// add index to quickly query internalServiceExport list by service
extractFunc := func(o client.Object) []string {
name := o.(*fleetnetv1alpha1.InternalServiceExport).Spec.ServiceReference.NamespacedName
return []string{name}
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &fleetnetv1alpha1.InternalServiceExport{}, exportedServiceFieldNamespacedName, extractFunc); err != nil {
klog.ErrorS(err, "Failed to create index", "field", exportedServiceFieldNamespacedName)
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&fleetnetv1alpha1.ServiceImport{}).
Complete(r)
}