cmd/frontend/main.go (238 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// A proxy that forwards incoming requests to an HTTP endpoint while authenticating
// it with static service account credentials or the default service account on GCE
// instances.
// It's primarily intended to authenticate Prometheus queries coming from Grafana against
// GMP as Grafana has no option to configure OAuth2 credentials.
package main
import (
"context"
"crypto/fips140"
"errors"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"os/signal"
"path"
"strings"
"syscall"
"time"
"github.com/GoogleCloudPlatform/prometheus-engine/cmd/frontend/internal/rule"
"github.com/GoogleCloudPlatform/prometheus-engine/internal/promapi"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/ui"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/api/option"
apihttp "google.golang.org/api/transport/http"
)
const projectIDVar = "PROJECT_ID"
var (
authUsernameEnv = "AUTH_USERNAME"
authPasswordEnv = "AUTH_PASSWORD"
projectID = flag.String("query.project-id", "",
"Project ID of the Google Cloud Monitoring workspace project to query.")
credentialsFile = flag.String("query.credentials-file", "",
"JSON-encoded credentials (service account or refresh token). Can be left empty if default credentials have sufficient permission.")
listenAddress = flag.String("web.listen-address", ":19090",
"Address on which to expose metrics and the query UI.")
externalURLStr = flag.String("web.external-url", "", "The URL under which the frontend is externally reachable (for example, if it is served via a reverse proxy). Used for generating relative and absolute links back to the frontend itself. If the URL has a path portion, it will be used to prefix served HTTP endpoints. If omitted, relevant URL components will be derived automatically.")
targetURLStr = flag.String("query.target-url", fmt.Sprintf("https://monitoring.googleapis.com/v1/projects/%s/location/global/prometheus", projectIDVar),
fmt.Sprintf("The URL to forward authenticated requests to. (%s is replaced with the --query.project-id flag.)", projectIDVar))
ruleEndpointURLStrings = flag.String("rules.target-urls", "http://rule-evaluator.gmp-system.svc.cluster.local:19092", "Comma separated lists of URLs that support HTTP Prometheus Alert and Rules APIs (/api/v1/alerts, /api/v1/rules), e.g. GMP rule-evaluator. NOTE: Results are merged as-is, no sorting and deduplication is done.")
logLevel = flag.String("log.level", "info",
"The level of logging. Can be one of 'debug', 'info', 'warn', 'error'")
)
func main() {
flag.Parse()
logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
if !fips140.Enabled() {
_ = logger.Log("msg", "FIPS mode not enabled")
os.Exit(1)
}
switch strings.ToLower(*logLevel) {
case "debug":
logger = level.NewFilter(logger, level.AllowDebug())
case "warn":
logger = level.NewFilter(logger, level.AllowWarn())
case "error":
logger = level.NewFilter(logger, level.AllowError())
case "info":
logger = level.NewFilter(logger, level.AllowInfo())
default:
//nolint:errcheck
level.Error(logger).Log("msg",
"--log.level can only be one of 'debug', 'info', 'warn', 'error'")
os.Exit(1)
}
metrics := prometheus.NewRegistry()
metrics.MustRegister(
collectors.NewGoCollector(),
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
)
if *projectID == "" {
//nolint:errcheck
level.Error(logger).Log("msg", "--query.project-id must be set")
os.Exit(1)
}
targetURL, err := url.Parse(strings.ReplaceAll(*targetURLStr, projectIDVar, *projectID))
if err != nil {
//nolint:errcheck
level.Error(logger).Log("msg", "parsing target URL failed", "err", err)
os.Exit(1)
}
externalURL, err := url.Parse(*externalURLStr)
if err != nil {
//nolint:errcheck
level.Error(logger).Log("msg", "parsing external URL failed", "err", err)
os.Exit(1)
}
var ruleEndpointURLs []url.URL
for _, ruleEndpointURLStr := range strings.Split(*ruleEndpointURLStrings, ",") {
ruleEndpointURL, err := url.Parse(strings.TrimSpace(ruleEndpointURLStr))
if err != nil || ruleEndpointURL == nil {
_ = level.Error(logger).Log("msg", "parsing rule endpoint URL failed", "err", err, "url", strings.TrimSpace(ruleEndpointURLStr))
os.Exit(1)
}
ruleEndpointURLs = append(ruleEndpointURLs, *ruleEndpointURL)
}
version, err := export.Version()
if err != nil {
_ = level.Error(logger).Log("msg", "Unable to fetch module version", "err", err)
os.Exit(1)
}
var g run.Group
{
term := make(chan os.Signal, 1)
cancel := make(chan struct{})
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
g.Add(
func() error {
select {
case <-term:
//nolint:errcheck
level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...")
case <-cancel:
}
return nil
},
func(error) {
close(cancel)
},
)
}
{
opts := []option.ClientOption{
option.WithScopes("https://www.googleapis.com/auth/monitoring.read"),
option.WithCredentialsFile(*credentialsFile),
}
ctx, cancel := context.WithCancel(context.Background())
transport, err := apihttp.NewTransport(ctx, http.DefaultTransport, opts...)
if err != nil {
//nolint:errcheck
level.Error(logger).Log("msg", "create proxy HTTP transport", "err", err)
os.Exit(1)
}
ruleProxy := rule.NewProxy(
log.With(logger, "component", "rule-proxy"),
&http.Client{Timeout: 30 * time.Second},
ruleEndpointURLs,
)
server := &http.Server{Addr: *listenAddress}
buildInfoHandler := http.HandlerFunc(promapi.BuildinfoHandlerFunc(log.With(logger, "component", "buildinfo-handler"), "frontend", version))
http.Handle("/api/v1/status/buildinfo", buildInfoHandler)
http.Handle("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{Registry: metrics}))
http.Handle("/api/v1/rules", http.HandlerFunc(ruleProxy.RuleGroups))
http.Handle("/api/v1/rules/", http.NotFoundHandler())
http.Handle("/api/v1/alerts", http.HandlerFunc(ruleProxy.Alerts))
http.Handle("/api/", authenticate(forward(logger, targetURL, transport)))
http.HandleFunc("/-/healthy", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Prometheus frontend is Healthy.\n")
})
http.HandleFunc("/-/ready", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "Prometheus frontend is Ready.\n")
})
http.Handle("/", authenticate(ui.Handler(externalURL)))
g.Add(func() error {
//nolint:errcheck
level.Info(logger).Log("msg", "Starting web server for metrics", "listen", *listenAddress)
return server.ListenAndServe()
}, func(error) {
//nolint:fatcontext //TODO review this linter error
ctx, cancel = context.WithTimeout(ctx, time.Minute)
if err := server.Shutdown(ctx); err != nil {
//nolint:errcheck
level.Error(logger).Log("msg", "Server failed to shut down gracefully")
}
cancel()
})
}
if err := g.Run(); err != nil {
//nolint:errcheck
level.Error(logger).Log("msg", "running reloader failed", "err", err)
os.Exit(1)
}
}
func authenticate(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
username := os.Getenv(authUsernameEnv)
password := os.Getenv(authPasswordEnv)
if len(username) > 0 && len(password) > 0 {
reqUser, reqPass, ok := req.BasicAuth()
if !ok {
w.Header().Set("WWW-Authenticate", "Basic")
w.WriteHeader(http.StatusUnauthorized)
return
}
if reqUser != username || reqPass != password {
w.WriteHeader(http.StatusForbidden)
return
}
}
next.ServeHTTP(w, req)
})
}
func forward(logger log.Logger, target *url.URL, transport http.RoundTripper) http.Handler {
client := http.Client{Transport: transport}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
u := *target
u.Path = path.Join(u.Path, req.URL.Path)
method := req.Method
// Write all params into the URL and make a GET request to work around
// /api/v1/series currently not accepting match[] params on POST.
if req.URL.Path == "/api/v1/series" {
method = "GET"
if err := req.ParseForm(); err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
u.RawQuery = req.Form.Encode()
} else {
u.RawQuery = req.URL.RawQuery
}
newReq, err := http.NewRequestWithContext(req.Context(), method, u.String(), req.Body)
if err != nil {
//nolint:errcheck
level.Warn(logger).Log("msg", "creating request failed", "err", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
copyHeader(newReq.Header, req.Header)
resp, err := client.Do(newReq)
if err != nil {
//nolint:errcheck
if errors.Is(err, context.Canceled) {
level.Warn(logger).Log("msg", "request to GCM was canceled by the caller of frontend. If a program made the request, consider increasing the timeout", "err", err)
w.WriteHeader(http.StatusBadRequest)
} else {
level.Warn(logger).Log("msg", "requesting GCM failed", "err", err)
w.WriteHeader(http.StatusInternalServerError)
}
return
}
copyHeader(w.Header(), resp.Header)
w.WriteHeader(resp.StatusCode)
defer resp.Body.Close()
if _, err := io.Copy(w, resp.Body); err != nil {
//nolint:errcheck
level.Warn(logger).Log("msg", "copying response body failed", "err", err)
return
}
})
}
func copyHeader(dst, src http.Header) {
for k, vals := range src {
for _, v := range vals {
dst.Add(k, v)
}
}
}