in pkg/operator/target_status.go [224:304]
func fetchTargets(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) ([]*prometheusv1.TargetsResult, error) {
namespace := opts.OperatorNamespace
var ds appsv1.DaemonSet
if err := kubeClient.Get(ctx, client.ObjectKey{
Name: NameCollector,
Namespace: namespace,
}, &ds); err != nil {
return nil, err
}
selector, err := metav1.LabelSelectorAsSelector(ds.Spec.Selector)
if err != nil {
return nil, err
}
var port *int32
for _, container := range ds.Spec.Template.Spec.Containers {
if isPrometheusContainer(&container) {
port = getPrometheusPort(&container)
if port != nil {
break
}
}
}
if port == nil {
return nil, errors.New("unable to detect Prometheus port")
}
pods, err := getPrometheusPods(ctx, kubeClient, opts, selector)
if err != nil {
return nil, err
}
// Set up pod job queue and jobs
podDiscoveryCh := make(chan prometheusPod)
wg := sync.WaitGroup{}
wg.Add(int(opts.TargetPollConcurrency))
// Must be unbounded or else we deadlock.
targetCh := make(chan *prometheusv1.TargetsResult)
for range opts.TargetPollConcurrency {
// Wrapper function so we can defer in this scope.
go func() {
defer wg.Done()
for prometheusPod := range podDiscoveryCh {
// Fetch operation is blocking.
target, err := getTarget(ctx, logger, httpClient, prometheusPod.port, prometheusPod.pod)
if err != nil {
logger.Error(err, "failed to fetch target", "pod", prometheusPod.pod.GetName())
}
// nil represents being unable to reach a target.
targetCh <- target
}
}()
}
// Unbuffered channels are blocking so make sure we end the goroutine processing them.
go func() {
for _, pod := range pods {
podDiscoveryCh <- prometheusPod{
port: *port,
pod: pod,
}
}
// Must close so jobs aren't waiting on the channel indefinitely.
close(podDiscoveryCh)
// Close target after we're sure all targets are queued.
wg.Wait()
close(targetCh)
}()
results := make([]*prometheusv1.TargetsResult, 0)
for target := range targetCh {
results = append(results, target)
}
return results, nil
}