func()

in grpc-xds/control-plane-go/pkg/informers/manager.go [73:124]


func (m *Manager) AddEndpointSliceInformer(ctx context.Context, logger logr.Logger, config Config) error {
	logger = logger.WithValues("kubecontext", m.kubecontext, "namespace", config.Namespace)
	if config.Services == nil {
		config.Services = make([]string, 0)
	}
	labelSelector := fmt.Sprintf("%s in (%s)", discoveryv1.LabelServiceName, strings.Join(config.Services, ", "))
	logger.V(2).Info("Creating informer for EndpointSlices", "labelSelector", labelSelector)

	stop := make(chan struct{})
	go func() {
		<-ctx.Done()
		logger.V(1).Info("Stopping informer for EndpointSlices", "labelSelector", labelSelector)
		close(stop)
	}()

	factory := informers.NewSharedInformerFactory(m.clientset, 0)
	informer := factory.InformerFor(&discoveryv1.EndpointSlice{}, func(clientSet kubernetes.Interface, resyncPeriod time.Duration) informercache.SharedIndexInformer {
		indexers := informercache.Indexers{informercache.NamespaceIndex: informercache.MetaNamespaceIndexFunc}
		return discoveryinformers.NewFilteredEndpointSliceInformer(clientSet, config.Namespace, resyncPeriod, indexers, func(listOptions *metav1.ListOptions) {
			listOptions.LabelSelector = labelSelector
		})
	})

	_, err := informer.AddEventHandler(informercache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			logger := logger.WithValues("event", "add")
			logEndpointSlice(logger, obj)
			apps := getAppsForInformer(logger, informer)
			m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
		},
		UpdateFunc: func(_, obj interface{}) {
			logger := logger.WithValues("event", "update")
			logEndpointSlice(logger, obj)
			apps := getAppsForInformer(logger, informer)
			m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
		},
		DeleteFunc: func(obj interface{}) {
			logger := logger.WithValues("event", "delete")
			logEndpointSlice(logger, obj)
			apps := getAppsForInformer(logger, informer)
			m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
		},
	})
	if err != nil {
		return fmt.Errorf("could not add informer event handler for kubecontext=%s namespace=%s services=%+v: %w", m.kubecontext, config.Namespace, config.Services, err)
	}
	go func() {
		logger.V(2).Info("Starting informer", "services", config.Services)
		informer.Run(stop)
	}()
	return nil
}