in cmd/frontend/internal/rule/proxy.go [78:147]
func fanoutForward[T *promapiv1.Alert | *promapiv1.RuleGroup](
ctx context.Context,
logger log.Logger,
ruleEndpoints []url.URL,
rawQuery string,
retrieveFn func(context.Context, url.URL, string) ([]T, error),
) ([]T, error) {
if len(ruleEndpoints) == 0 {
_ = level.Warn(logger).Log("msg", "tried to fetch rules/alerts, no endpoints (--rules.target-urls) configured")
return []T{}, nil
}
var (
wg = sync.WaitGroup{}
resultChan, errChan = make(chan []T), make(chan error)
results []T
errs []error
)
// Parallel call to all endpoints.
for _, baseURL := range ruleEndpoints {
wg.Add(1)
go func(baseURL url.URL) {
defer wg.Done()
result, err := retrieveFn(ctx, baseURL, rawQuery)
if err != nil {
errChan <- fmt.Errorf("retrieving alerts from %s failed: %w", baseURL.String(), err)
return
}
resultChan <- result
}(baseURL)
}
go func() {
// Wait for all rule evaluators to finish and close the channels.
wg.Wait()
close(resultChan)
close(errChan)
}()
// Collect results and errors from the channels.
for resultChan != nil || errChan != nil {
select {
case result, ok := <-resultChan:
if !ok {
resultChan = nil
continue
}
results = append(results, result...)
case err, ok := <-errChan:
if !ok {
errChan = nil
continue
}
errs = append(errs, err)
}
}
if len(errs) != 0 {
if len(errs) == len(ruleEndpoints) {
_ = level.Error(logger).Log("msg", "all endpoints failed", "errors", errs)
return nil, errAllEndpointsFailed
}
_ = level.Warn(logger).Log("msg", "some endpoints failed; potentially partial result", "errors", errs)
}
// TODO(bwplotka): Sort?
return results, nil
}