func()

in discovery/kubernetes/kubernetes.go [390:678]


func (d *Discovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
	d.Lock()
	namespaces := d.getNamespaces()

	switch d.role {
	case RoleEndpointSlice:
		// Check "networking.k8s.io/v1" availability with retries.
		// If "v1" is not available, use "networking.k8s.io/v1beta1" for backward compatibility
		var v1Supported bool
		if retryOnError(ctx, 10*time.Second,
			func() (err error) {
				v1Supported, err = checkDiscoveryV1Supported(d.client)
				if err != nil {
					level.Error(d.logger).Log("msg", "Failed to check networking.k8s.io/v1 availability", "err", err)
				}
				return err
			},
		) {
			d.Unlock()
			return
		}

		for _, namespace := range namespaces {
			var informer cache.SharedIndexInformer
			if v1Supported {
				e := d.client.DiscoveryV1().EndpointSlices(namespace)
				elw := &cache.ListWatch{
					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
						options.FieldSelector = d.selectors.endpointslice.field
						options.LabelSelector = d.selectors.endpointslice.label
						return e.List(ctx, options)
					},
					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
						options.FieldSelector = d.selectors.endpointslice.field
						options.LabelSelector = d.selectors.endpointslice.label
						return e.Watch(ctx, options)
					},
				}
				informer = d.newEndpointSlicesByNodeInformer(elw, &disv1.EndpointSlice{})
			} else {
				e := d.client.DiscoveryV1beta1().EndpointSlices(namespace)
				elw := &cache.ListWatch{
					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
						options.FieldSelector = d.selectors.endpointslice.field
						options.LabelSelector = d.selectors.endpointslice.label
						return e.List(ctx, options)
					},
					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
						options.FieldSelector = d.selectors.endpointslice.field
						options.LabelSelector = d.selectors.endpointslice.label
						return e.Watch(ctx, options)
					},
				}
				informer = d.newEndpointSlicesByNodeInformer(elw, &disv1beta1.EndpointSlice{})
			}

			s := d.client.CoreV1().Services(namespace)
			slw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.Watch(ctx, options)
				},
			}
			p := d.client.CoreV1().Pods(namespace)
			plw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.Watch(ctx, options)
				},
			}
			var nodeInf cache.SharedInformer
			if d.attachMetadata.Node {
				nodeInf = d.newNodeInformer(context.Background())
				go nodeInf.Run(ctx.Done())
			}
			eps := NewEndpointSlice(
				log.With(d.logger, "role", "endpointslice"),
				informer,
				cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
				cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
				nodeInf,
			)
			d.discoverers = append(d.discoverers, eps)
			go eps.endpointSliceInf.Run(ctx.Done())
			go eps.serviceInf.Run(ctx.Done())
			go eps.podInf.Run(ctx.Done())
		}
	case RoleEndpoint:
		for _, namespace := range namespaces {
			e := d.client.CoreV1().Endpoints(namespace)
			elw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.endpoints.field
					options.LabelSelector = d.selectors.endpoints.label
					return e.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.endpoints.field
					options.LabelSelector = d.selectors.endpoints.label
					return e.Watch(ctx, options)
				},
			}
			s := d.client.CoreV1().Services(namespace)
			slw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.Watch(ctx, options)
				},
			}
			p := d.client.CoreV1().Pods(namespace)
			plw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.Watch(ctx, options)
				},
			}
			var nodeInf cache.SharedInformer
			if d.attachMetadata.Node {
				nodeInf = d.newNodeInformer(ctx)
				go nodeInf.Run(ctx.Done())
			}

			eps := NewEndpoints(
				log.With(d.logger, "role", "endpoint"),
				d.newEndpointsByNodeInformer(elw),
				cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
				cache.NewSharedInformer(plw, &apiv1.Pod{}, resyncDisabled),
				nodeInf,
			)
			d.discoverers = append(d.discoverers, eps)
			go eps.endpointsInf.Run(ctx.Done())
			go eps.serviceInf.Run(ctx.Done())
			go eps.podInf.Run(ctx.Done())
		}
	case RolePod:
		var nodeInformer cache.SharedInformer
		if d.attachMetadata.Node {
			nodeInformer = d.newNodeInformer(ctx)
			go nodeInformer.Run(ctx.Done())
		}

		for _, namespace := range namespaces {
			p := d.client.CoreV1().Pods(namespace)
			plw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.pod.field
					options.LabelSelector = d.selectors.pod.label
					return p.Watch(ctx, options)
				},
			}
			pod := NewPod(
				log.With(d.logger, "role", "pod"),
				d.newPodsByNodeInformer(plw),
				nodeInformer,
			)
			d.discoverers = append(d.discoverers, pod)
			go pod.podInf.Run(ctx.Done())
		}
	case RoleService:
		for _, namespace := range namespaces {
			s := d.client.CoreV1().Services(namespace)
			slw := &cache.ListWatch{
				ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.List(ctx, options)
				},
				WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
					options.FieldSelector = d.selectors.service.field
					options.LabelSelector = d.selectors.service.label
					return s.Watch(ctx, options)
				},
			}
			svc := NewService(
				log.With(d.logger, "role", "service"),
				cache.NewSharedInformer(slw, &apiv1.Service{}, resyncDisabled),
			)
			d.discoverers = append(d.discoverers, svc)
			go svc.informer.Run(ctx.Done())
		}
	case RoleIngress:
		// Check "networking.k8s.io/v1" availability with retries.
		// If "v1" is not available, use "networking.k8s.io/v1beta1" for backward compatibility
		var v1Supported bool
		if retryOnError(ctx, 10*time.Second,
			func() (err error) {
				v1Supported, err = checkNetworkingV1Supported(d.client)
				if err != nil {
					level.Error(d.logger).Log("msg", "Failed to check networking.k8s.io/v1 availability", "err", err)
				}
				return err
			},
		) {
			d.Unlock()
			return
		}

		for _, namespace := range namespaces {
			var informer cache.SharedInformer
			if v1Supported {
				i := d.client.NetworkingV1().Ingresses(namespace)
				ilw := &cache.ListWatch{
					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
						options.FieldSelector = d.selectors.ingress.field
						options.LabelSelector = d.selectors.ingress.label
						return i.List(ctx, options)
					},
					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
						options.FieldSelector = d.selectors.ingress.field
						options.LabelSelector = d.selectors.ingress.label
						return i.Watch(ctx, options)
					},
				}
				informer = cache.NewSharedInformer(ilw, &networkv1.Ingress{}, resyncDisabled)
			} else {
				i := d.client.NetworkingV1beta1().Ingresses(namespace)
				ilw := &cache.ListWatch{
					ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
						options.FieldSelector = d.selectors.ingress.field
						options.LabelSelector = d.selectors.ingress.label
						return i.List(ctx, options)
					},
					WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
						options.FieldSelector = d.selectors.ingress.field
						options.LabelSelector = d.selectors.ingress.label
						return i.Watch(ctx, options)
					},
				}
				informer = cache.NewSharedInformer(ilw, &v1beta1.Ingress{}, resyncDisabled)
			}
			ingress := NewIngress(
				log.With(d.logger, "role", "ingress"),
				informer,
			)
			d.discoverers = append(d.discoverers, ingress)
			go ingress.informer.Run(ctx.Done())
		}
	case RoleNode:
		nodeInformer := d.newNodeInformer(ctx)
		node := NewNode(log.With(d.logger, "role", "node"), nodeInformer)
		d.discoverers = append(d.discoverers, node)
		go node.informer.Run(ctx.Done())
	default:
		level.Error(d.logger).Log("msg", "unknown Kubernetes discovery kind", "role", d.role)
	}

	var wg sync.WaitGroup
	for _, dd := range d.discoverers {
		wg.Add(1)
		go func(d discovery.Discoverer) {
			defer wg.Done()
			d.Run(ctx, ch)
		}(dd)
	}

	d.Unlock()

	wg.Wait()
	<-ctx.Done()
}