func fetchTargets()

in pkg/operator/target_status.go [224:304]


func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) ([]*prometheusv1.TargetsResult, error) {
	namespace := opts.OperatorNamespace
	var ds appsv1.DaemonSet
	if err := kubeClient.Get(ctx, client.ObjectKey{
		Name:      NameCollector,
		Namespace: namespace,
	}, &ds); err != nil {
		return nil, err
	}

	selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
	if err != nil {
		return nil, err
	}

	var port *int32
	for _, container := range ds.Spec.Template.Spec.Containers {
		if isPrometheusContainer(&container) {
			port = getPrometheusPort(&container)
			if port != nil {
				break
			}
		}
	}
	if port == nil {
		return nil, errors.New("unable to detect Prometheus port")
	}

	pods, err := getPrometheusPods(ctx, kubeClient, opts, selector)
	if err != nil {
		return nil, err
	}

	// Set up pod job queue and jobs
	podDiscoveryCh := make(chan prometheusPod)
	wg := sync.WaitGroup{}
	wg.Add(int(opts.TargetPollConcurrency))

	// Must be unbounded or else we deadlock.
	targetCh := make(chan *prometheusv1.TargetsResult)

	for range opts.TargetPollConcurrency {
		// Wrapper function so we can defer in this scope.
		go func() {
			defer wg.Done()
			for prometheusPod := range podDiscoveryCh {
				// Fetch operation is blocking.
				target, err := getTarget(ctx, logger, httpClient, prometheusPod.port, prometheusPod.pod)
				if err != nil {
					logger.Error(err, "failed to fetch target", "pod", prometheusPod.pod.GetName())
				}
				// nil represents being unable to reach a target.
				targetCh <- target
			}
		}()
	}

	// Unbuffered channels are blocking so make sure we end the goroutine processing them.
	go func() {
		for _, pod := range pods {
			podDiscoveryCh <- prometheusPod{
				port: *port,
				pod:  pod,
			}
		}

		// Must close so jobs aren't waiting on the channel indefinitely.
		close(podDiscoveryCh)

		// Close target after we're sure all targets are queued.
		wg.Wait()
		close(targetCh)
	}()

	results := make([]*prometheusv1.TargetsResult, 0)
	for target := range targetCh {
		results = append(results, target)
	}

	return results, nil
}