pkg/controllers/service_controller.go (94 lines of code) (raw):

// Copyright (c) 2022, 2024, Oracle and/or its affiliates. // // Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ package controllers import ( "context" "fmt" "github.com/mysql/ndb-operator/pkg/resources/statefulset" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" listerscorev1 "k8s.io/client-go/listers/core/v1" klog "k8s.io/klog/v2" ) type ServiceControlInterface interface { ensureService( ctx context.Context, sc *SyncContext, ndbSfset statefulset.NdbStatefulSetInterface) (*corev1.Service, error) patchService( ctx context.Context, sc *SyncContext, ndbSfset statefulset.NdbStatefulSetInterface) error deleteService( ctx context.Context, namespace, name string) error } type serviceControl struct { k8sClient kubernetes.Interface serviceLister listerscorev1.ServiceLister } // NewServiceControl creates a new ServiceControl func NewServiceControl(client kubernetes.Interface, serviceLister listerscorev1.ServiceLister) ServiceControlInterface { return &serviceControl{ k8sClient: client, serviceLister: serviceLister, } } // getServiceInterface retrieves the Service Interface from the API Server func (svcCtrl *serviceControl) getServiceInterface(namespace string) typedcorev1.ServiceInterface { return svcCtrl.k8sClient.CoreV1().Services(namespace) } // EnsureService creates a service if it doesn't exist yet. func (svcCtrl *serviceControl) ensureService( ctx context.Context, sc *SyncContext, ndbSfset statefulset.NdbStatefulSetInterface) (*corev1.Service, error) { nc := sc.ndb serviceName := nc.GetServiceName(ndbSfset.GetTypeName()) svc, err := svcCtrl.serviceLister.Services(nc.Namespace).Get(serviceName) if err == nil { // Service exists already if err = sc.isOwnedByNdbCluster(svc); err != nil { // But it is not owned by the NdbCluster resource klog.Errorf( "Attempting to create service %q failed as it exists already but not owned by NdbCluster resource %q", serviceName, getNamespacedName(nc)) return nil, err } // Service already exists and is owned by nc return svc, nil } if !apierrors.IsNotFound(err) { // Error other than NotFound klog.Errorf("Error getting Service %q from serviceLister : %s", serviceName, err) return nil, err } // Service not found - create it svc = ndbSfset.NewGoverningService(nc) klog.Infof("Creating a new Service %q for NdbCluster resource %q", getNamespacedName(svc), getNamespacedName(sc.ndb)) svc, err = svcCtrl.getServiceInterface(sc.ndb.Namespace).Create(ctx, svc, metav1.CreateOptions{}) if err != nil && !apierrors.IsAlreadyExists(err) { // Create failed. Ignore AlreadyExists error as it // might have been caused due to an outdated cache read. klog.Errorf("Error creating Service %q : %s", getNamespacedName(svc), err) return nil, err } return svc, nil } // patchService patches the given service if required func (svcCtrl *serviceControl) patchService( ctx context.Context, sc *SyncContext, ndbSfset statefulset.NdbStatefulSetInterface) error { // Get current service via ensureService. // Note : ensureService will also create the service if it is // missing (which is not possible unless it has been deleted manually). currentSvc, err := svcCtrl.ensureService(ctx, sc, ndbSfset) if err != nil { return err } // Get the updated service nc := sc.ndb updatedSvc := ndbSfset.NewGoverningService(nc) // Only changing the Service type is supported if currentSvc.Spec.Type == updatedSvc.Spec.Type { // No change to service return nil } // For some reason the "regular" patch method do not work for Services. // Use a JSON Merge patch instead jsonMergePatch := fmt.Sprintf(`{"spec":{"type":"%s"}}`, updatedSvc.Spec.Type) // Patch the service _, err = svcCtrl.getServiceInterface(currentSvc.Namespace).Patch( ctx, currentSvc.GetName(), types.MergePatchType, []byte(jsonMergePatch), metav1.PatchOptions{}) if err != nil { klog.Errorf("Failed to patch the service %q : %s", getNamespacedName(currentSvc), err) return err } // Successfully applied the patch klog.Infof("Service %q has been patched successfully", getNamespacedName(currentSvc)) return nil } // deleteService deletes the given service func (svcCtrl *serviceControl) deleteService( ctx context.Context, namespace, name string) error { err := svcCtrl.getServiceInterface(namespace).Delete(ctx, name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { // Delete failed with an error. // Ignore NotFound error as this delete might be a redundant // step, caused by an outdated cache read. klog.Errorf("Failed to delete the Service %q : %s", getNamespacedName2(namespace, name), err) return err } klog.Infof("Deleted Service %q", getNamespacedName2(namespace, name)) return nil }