in grpc-xds/control-plane-go/pkg/informers/manager.go [73:124]
func (m *Manager) AddEndpointSliceInformer(ctx context.Context, logger logr.Logger, config Config) error {
logger = logger.WithValues("kubecontext", m.kubecontext, "namespace", config.Namespace)
if config.Services == nil {
config.Services = make([]string, 0)
}
labelSelector := fmt.Sprintf("%s in (%s)", discoveryv1.LabelServiceName, strings.Join(config.Services, ", "))
logger.V(2).Info("Creating informer for EndpointSlices", "labelSelector", labelSelector)
stop := make(chan struct{})
go func() {
<-ctx.Done()
logger.V(1).Info("Stopping informer for EndpointSlices", "labelSelector", labelSelector)
close(stop)
}()
factory := informers.NewSharedInformerFactory(m.clientset, 0)
informer := factory.InformerFor(&discoveryv1.EndpointSlice{}, func(clientSet kubernetes.Interface, resyncPeriod time.Duration) informercache.SharedIndexInformer {
indexers := informercache.Indexers{informercache.NamespaceIndex: informercache.MetaNamespaceIndexFunc}
return discoveryinformers.NewFilteredEndpointSliceInformer(clientSet, config.Namespace, resyncPeriod, indexers, func(listOptions *metav1.ListOptions) {
listOptions.LabelSelector = labelSelector
})
})
_, err := informer.AddEventHandler(informercache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
logger := logger.WithValues("event", "add")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
UpdateFunc: func(_, obj interface{}) {
logger := logger.WithValues("event", "update")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
DeleteFunc: func(obj interface{}) {
logger := logger.WithValues("event", "delete")
logEndpointSlice(logger, obj)
apps := getAppsForInformer(logger, informer)
m.handleEndpointSliceEvent(ctx, logger, config.Namespace, apps)
},
})
if err != nil {
return fmt.Errorf("could not add informer event handler for kubecontext=%s namespace=%s services=%+v: %w", m.kubecontext, config.Namespace, config.Services, err)
}
go func() {
logger.V(2).Info("Starting informer", "services", config.Services)
informer.Run(stop)
}()
return nil
}