cmd/config-reloader/main.go (171 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package main import ( "context" "crypto/fips140" "flag" "net/http" "net/url" "os" "os/signal" "strings" "syscall" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/thanos-io/thanos/pkg/reloader" ) func main() { var ( watchedDirs stringSlice configFile = flag.String("config-file", "", "config file to watch for changes") configFileOutput = flag.String("config-file-output", "", "config file to write with interpolated environment variables") configDir = flag.String("config-dir", "", "config directory to watch for changes") configDirOutput = flag.String("config-dir-output", "", "config directory to write with interpolated environment variables") // Ready and reload endpoints should be compatible with Prometheus-style // management APIs, e.g. // https://prometheus.io/docs/prometheus/latest/management_api/ // https://prometheus.io/docs/alerting/latest/management_api/ reloadURLStr = flag.String("reload-url", "http://127.0.0.1:19090/-/reload", "reload endpoint of the configuration target that triggers a reload of the configuration file") readyURLStr = flag.String("ready-url", "http://127.0.0.1:19090/-/ready", "ready endpoint of the configuration target that returns a 200 when ready to serve traffic. If set, the config-reloader will probe it on startup") readyProbingInterval = flag.Duration("ready-startup-probing-interval", 1*time.Second, "how often to poll ready endpoint during startup") readyProbingNoConnectionThreshold = flag.Int("ready-startup-probing-no-conn-threshold", 5, "how many times ready endpoint can fail due to no connection failure. This can happen if the config-reloader starts faster than the config target endpoint readiness server.") listenAddress = flag.String("listen-address", ":19091", "address on which to expose metrics") ) flag.Var(&watchedDirs, "watched-dir", "directory to watch for file changes (for rule and secret files, may be repeated)") flag.Parse() logger := log.NewJSONLogger(log.NewSyncWriter(os.Stderr)) logger = log.With(logger, "ts", log.DefaultTimestampUTC) logger = log.With(logger, "caller", log.DefaultCaller) if !fips140.Enabled() { _ = logger.Log("msg", "FIPS mode not enabled") os.Exit(1) } if *configDirOutput != "" && *configDir == "" { //nolint:errcheck level.Error(logger).Log("msg", "config-dir-output specified without config-dir") os.Exit(1) } metrics := prometheus.NewRegistry() metrics.MustRegister( collectors.NewGoCollector(), collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}), ) reloadURL, err := url.Parse(*reloadURLStr) if err != nil { //nolint:errcheck level.Error(logger).Log("msg", "parsing reloader URL failed", "err", err) os.Exit(1) } // Set up interrupt signal handler. term := make(chan os.Signal, 1) signal.Notify(term, os.Interrupt, syscall.SIGTERM) // Poll ready endpoint indefinitely until it's up and running. req, err := http.NewRequest(http.MethodGet, *readyURLStr, nil) if err != nil { //nolint:errcheck level.Error(logger).Log("msg", "creating request", "err", err) os.Exit(1) } var ( ticker = time.NewTicker(*readyProbingInterval) acceptableNoConnectionErrors = *readyProbingNoConnectionThreshold done = make(chan bool) ) go func() { //nolint:errcheck level.Info(logger).Log("msg", "ensure ready-url is healthy") for { select { case <-term: //nolint:errcheck level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...") os.Exit(0) case <-ticker.C: resp, err := http.DefaultClient.Do(req) if err != nil { if acceptableNoConnectionErrors <= 0 { //nolint:errcheck level.Error(logger).Log("msg", "polling ready-url", "err", err, "no-connection-threshold", *readyProbingNoConnectionThreshold) os.Exit(1) } acceptableNoConnectionErrors-- continue } if err := resp.Body.Close(); err != nil { //nolint:errcheck level.Warn(logger).Log("msg", "unable to close response body", "err", err) } if resp.StatusCode == http.StatusOK { //nolint:errcheck level.Info(logger).Log("msg", "ready-url is healthy") ticker.Stop() done <- true return } } } }() <-done var cfgDirs []reloader.CfgDirOption if *configDir != "" { cfgDirs = append(cfgDirs, reloader.CfgDirOption{ Dir: *configDir, OutputDir: *configDirOutput, }) } rel := reloader.New( logger, metrics, &reloader.Options{ ReloadURL: reloadURL, CfgFile: *configFile, CfgOutputFile: *configFileOutput, CfgDirs: cfgDirs, WatchedDirs: watchedDirs, // There are some reliability issues with fsnotify picking up file changes. // Configure a very aggress refresh for now. The reloader will only send reload signals // to Prometheus if the contents actually changed. So this should not have any practical // drawbacks. WatchInterval: 10 * time.Second, RetryInterval: 5 * time.Second, DelayInterval: 3 * time.Second, }, ) var g run.Group { ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { return rel.Watch(ctx) }, func(error) { cancel() }) } { cancel := make(chan struct{}) g.Add( func() error { select { case <-term: //nolint:errcheck level.Info(logger).Log("msg", "received SIGTERM, exiting gracefully...") case <-cancel: } return nil }, func(error) { close(cancel) }, ) } { server := &http.Server{Addr: *listenAddress} http.Handle("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{Registry: metrics})) g.Add(func() error { //nolint:errcheck level.Info(logger).Log("msg", "Starting web server for metrics", "listen", *listenAddress) return server.ListenAndServe() }, func(error) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) if err := server.Shutdown(ctx); err != nil { //nolint:errcheck level.Error(logger).Log("msg", "Server failed to shut down gracefully.") } cancel() }) } if err := g.Run(); err != nil { //nolint:errcheck level.Error(logger).Log("msg", "running reloader failed", "err", err) os.Exit(1) } } type stringSlice []string func (ss *stringSlice) String() string { return strings.Join(*ss, ", ") } func (ss *stringSlice) Set(value string) error { *ss = append(*ss, value) return nil }