internal/k8sCommon/k8sclient/servicewatcher.go (98 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: MIT package k8sclient import ( "sync" mapset "github.com/deckarep/golang-set/v2" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" ) type ServiceWatcher struct { ipToServiceAndNamespace *sync.Map serviceAndNamespaceToSelectors *sync.Map logger *zap.Logger informer cache.SharedIndexInformer deleter Deleter } func NewServiceWatcher(logger *zap.Logger, sharedInformerFactory informers.SharedInformerFactory, deleter Deleter) *ServiceWatcher { serviceInformer := sharedInformerFactory.Core().V1().Services().Informer() err := serviceInformer.SetTransform(minimizeService) if err != nil { logger.Error("failed to minimize Service objects", zap.Error(err)) } return &ServiceWatcher{ ipToServiceAndNamespace: &sync.Map{}, serviceAndNamespaceToSelectors: &sync.Map{}, logger: logger, informer: serviceInformer, deleter: deleter, } } func (s *ServiceWatcher) Run(stopCh chan struct{}) { s.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { service := obj.(*corev1.Service) s.logger.Debug("list and watch for services: ADD " + service.Name) s.onAddOrUpdateService(service) }, UpdateFunc: func(_, newObj interface{}) { service := newObj.(*corev1.Service) s.logger.Debug("list and watch for services: UPDATE " + service.Name) s.onAddOrUpdateService(service) }, DeleteFunc: func(obj interface{}) { service := obj.(*corev1.Service) s.logger.Debug("list and watch for services: DELETE " + service.Name) s.onDeleteService(service, s.deleter) }, }) go s.informer.Run(stopCh) } func (s *ServiceWatcher) WaitForCacheSync(stopCh chan struct{}) { if !cache.WaitForNamedCacheSync("serviceWatcher", stopCh, s.informer.HasSynced) { s.logger.Error("timed out waiting for kubernetes service watcher caches to sync") } s.logger.Info("serviceWatcher: Cache synced") } func (s *ServiceWatcher) onAddOrUpdateService(service *corev1.Service) { // service can also have an external IP (or ingress IP) that could be accessed // this field can be either an IP address (in some edge case) or a hostname (see "EXTERNAL-IP" column in "k get svc" output) // [ec2-user@ip-172-31-11-104 one-step]$ k get svc -A // NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE // default pet-clinic-frontend ClusterIP 10.100.216.182 <none> 8080/TCP 108m // default vets-service ClusterIP 10.100.62.167 <none> 8083/TCP 108m // default visits-service ClusterIP 10.100.96.5 <none> 8082/TCP 108m // ingress-nginx default-http-backend ClusterIP 10.100.11.231 <none> 80/TCP 108m // ingress-nginx ingress-nginx LoadBalancer 10.100.154.5 aex7997ece08c435dbd2b912fd5aa5bd-5372117830.xxxxx.elb.amazonaws.com 80:32080/TCP,443:32081/TCP,9113:30410/TCP 108m // kube-system kube-dns ClusterIP 10.100.0.10 <none> // // we ignore such case for now and may need to consider it in the future if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { s.ipToServiceAndNamespace.Store(service.Spec.ClusterIP, getServiceAndNamespace(service)) } labelSet := mapset.NewSet[string]() for key, value := range service.Spec.Selector { labelSet.Add(key + "=" + value) } if labelSet.Cardinality() > 0 { s.serviceAndNamespaceToSelectors.Store(getServiceAndNamespace(service), labelSet) } } func (s *ServiceWatcher) onDeleteService(service *corev1.Service, deleter Deleter) { if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != corev1.ClusterIPNone { deleter.DeleteWithDelay(s.ipToServiceAndNamespace, service.Spec.ClusterIP) } deleter.DeleteWithDelay(s.serviceAndNamespaceToSelectors, getServiceAndNamespace(service)) } // minimizeService removes fields that could contain large objects, and retain essential // fields needed for IP/name translation. The following fields must be kept: // - ObjectMeta: Namespace, Name // - Spec: Selectors, ClusterIP func minimizeService(obj interface{}) (interface{}, error) { if svc, ok := obj.(*corev1.Service); ok { svc.Annotations = nil svc.Finalizers = nil svc.ManagedFields = nil svc.Spec.LoadBalancerSourceRanges = nil svc.Spec.SessionAffinityConfig = nil svc.Spec.IPFamilies = nil svc.Spec.IPFamilyPolicy = nil svc.Spec.InternalTrafficPolicy = nil svc.Spec.InternalTrafficPolicy = nil svc.Status.Conditions = nil } return obj, nil } // GetIPToServiceAndNamespace returns the ipToServiceAndNamespace func (s *ServiceWatcher) GetIPToServiceAndNamespace() *sync.Map { return s.ipToServiceAndNamespace } // InitializeIPToServiceAndNamespace initializes the ipToServiceAndNamespace func (s *ServiceWatcher) InitializeIPToServiceAndNamespace() { s.ipToServiceAndNamespace = &sync.Map{} } // GetServiceAndNamespaceToSelectors returns the serviceAndNamespaceToSelectors func (s *ServiceWatcher) GetServiceAndNamespaceToSelectors() *sync.Map { return s.serviceAndNamespaceToSelectors }