func fanoutForward[T *promapiv1.Alert | *promapiv1.RuleGroup]()

in cmd/frontend/internal/rule/proxy.go [78:147]


func fanoutForward[T *promapiv1.Alert | *promapiv1.RuleGroup](
	ctx context.Context,
	logger log.Logger,
	ruleEndpoints []url.URL,
	rawQuery string,
	retrieveFn func(context.Context, url.URL, string) ([]T, error),
) ([]T, error) {
	if len(ruleEndpoints) == 0 {
		_ = level.Warn(logger).Log("msg", "tried to fetch rules/alerts, no endpoints (--rules.target-urls) configured")
		return []T{}, nil
	}

	var (
		wg                  = sync.WaitGroup{}
		resultChan, errChan = make(chan []T), make(chan error)
		results             []T
		errs                []error
	)

	// Parallel call to all endpoints.
	for _, baseURL := range ruleEndpoints {
		wg.Add(1)
		go func(baseURL url.URL) {
			defer wg.Done()

			result, err := retrieveFn(ctx, baseURL, rawQuery)
			if err != nil {
				errChan <- fmt.Errorf("retrieving alerts from %s failed: %w", baseURL.String(), err)
				return
			}

			resultChan <- result
		}(baseURL)
	}

	go func() {
		// Wait for all rule evaluators to finish and close the channels.
		wg.Wait()
		close(resultChan)
		close(errChan)
	}()

	// Collect results and errors from the channels.
	for resultChan != nil || errChan != nil {
		select {
		case result, ok := <-resultChan:
			if !ok {
				resultChan = nil
				continue
			}
			results = append(results, result...)
		case err, ok := <-errChan:
			if !ok {
				errChan = nil
				continue
			}
			errs = append(errs, err)
		}
	}

	if len(errs) != 0 {
		if len(errs) == len(ruleEndpoints) {
			_ = level.Error(logger).Log("msg", "all endpoints failed", "errors", errs)
			return nil, errAllEndpointsFailed
		}
		_ = level.Warn(logger).Log("msg", "some endpoints failed; potentially partial result", "errors", errs)
	}
	// TODO(bwplotka): Sort?
	return results, nil
}