cmd/frontend/internal/rule/proxy.go (116 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rule import ( "context" "errors" "fmt" "net/http" "net/url" "sync" "github.com/GoogleCloudPlatform/prometheus-engine/internal/promapi" "github.com/go-kit/log" "github.com/go-kit/log/level" promapiv1 "github.com/prometheus/prometheus/web/api/v1" ) var errAllEndpointsFailed = errors.New("all endpoint failed") // Retriever is an interface for fetching rules and alerts. type retriever interface { RuleGroups(ctx context.Context, baseURL url.URL, queryString string) ([]*promapiv1.RuleGroup, error) Alerts(ctx context.Context, baseURL url.URL, queryString string) ([]*promapiv1.Alert, error) } // Proxy fan-outs requests to multiple endpoints serving rules and alerts. // Results are un-sorted and concatenated as-is. In case of errors from any endpoint, // warning log and partial results are returned. type Proxy struct { logger log.Logger endpoints []url.URL client retriever } // NewProxy creates a new proxy. func NewProxy(logger log.Logger, c httpClient, ruleEndpoints []url.URL) *Proxy { return &Proxy{ logger: logger, endpoints: ruleEndpoints, client: newClient(c), } } func (p *Proxy) RuleGroups(w http.ResponseWriter, req *http.Request) { rules, err := fanoutForward[*promapiv1.RuleGroup](req.Context(), p.logger, p.endpoints, req.URL.RawQuery, p.client.RuleGroups) if err != nil { p.handleError(w, req, err) return } promapi.WriteSuccessResponse(p.logger, w, http.StatusOK, req.URL.Path, promapi.RulesResponseData{Groups: rules}) } func (p *Proxy) Alerts(w http.ResponseWriter, req *http.Request) { alerts, err := fanoutForward[*promapiv1.Alert](req.Context(), p.logger, p.endpoints, req.URL.RawQuery, p.client.Alerts) if err != nil { p.handleError(w, req, err) return } promapi.WriteSuccessResponse(p.logger, w, http.StatusOK, req.URL.Path, promapi.AlertsResponseData{Alerts: alerts}) } // fanoutForward calls the endpoints in parallel and returns the combined results. func fanoutForward[T *promapiv1.Alert | *promapiv1.RuleGroup]( ctx context.Context, logger log.Logger, ruleEndpoints []url.URL, rawQuery string, retrieveFn func(context.Context, url.URL, string) ([]T, error), ) ([]T, error) { if len(ruleEndpoints) == 0 { _ = level.Warn(logger).Log("msg", "tried to fetch rules/alerts, no endpoints (--rules.target-urls) configured") return []T{}, nil } var ( wg = sync.WaitGroup{} resultChan, errChan = make(chan []T), make(chan error) results []T errs []error ) // Parallel call to all endpoints. for _, baseURL := range ruleEndpoints { wg.Add(1) go func(baseURL url.URL) { defer wg.Done() result, err := retrieveFn(ctx, baseURL, rawQuery) if err != nil { errChan <- fmt.Errorf("retrieving alerts from %s failed: %w", baseURL.String(), err) return } resultChan <- result }(baseURL) } go func() { // Wait for all rule evaluators to finish and close the channels. wg.Wait() close(resultChan) close(errChan) }() // Collect results and errors from the channels. for resultChan != nil || errChan != nil { select { case result, ok := <-resultChan: if !ok { resultChan = nil continue } results = append(results, result...) case err, ok := <-errChan: if !ok { errChan = nil continue } errs = append(errs, err) } } if len(errs) != 0 { if len(errs) == len(ruleEndpoints) { _ = level.Error(logger).Log("msg", "all endpoints failed", "errors", errs) return nil, errAllEndpointsFailed } _ = level.Warn(logger).Log("msg", "some endpoints failed; potentially partial result", "errors", errs) } // TODO(bwplotka): Sort? return results, nil } // handleError writes an error response to the client based on the error. func (p *Proxy) handleError(w http.ResponseWriter, req *http.Request, err error) { if errors.Is(err, context.Canceled) { promapi.WriteError(p.logger, w, promapi.ErrorCanceled, err.Error(), http.StatusGatewayTimeout, req.URL.Path) return } if errors.Is(err, context.DeadlineExceeded) { promapi.WriteError(p.logger, w, promapi.ErrorTimeout, err.Error(), http.StatusGatewayTimeout, req.URL.Path) return } promapi.WriteError(p.logger, w, promapi.ErrorInternal, err.Error(), http.StatusInternalServerError, req.URL.Path) }