cmd/amazon-cloudwatch-agent-target-allocator/server/server.go (257 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package server
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"sync"
"time"
yaml2 "github.com/ghodss/yaml"
"github.com/gin-gonic/gin"
"github.com/go-logr/logr"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
promcommconfig "github.com/prometheus/common/config"
promconfig "github.com/prometheus/prometheus/config"
"gopkg.in/yaml.v2"
"github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/allocation"
"github.com/aws/amazon-cloudwatch-agent-operator/cmd/amazon-cloudwatch-agent-target-allocator/target"
)
var (
httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "amazon_cloudwatch_agent_allocator_http_duration_seconds",
Help: "Duration of received HTTP requests.",
}, []string{"path"})
)
var (
jsonConfig = jsoniter.Config{
EscapeHTML: false,
MarshalFloatWith6Digits: true,
ObjectFieldMustBeSimpleString: true,
}.Froze()
)
type collectorJSON struct {
Link string `json:"_link"`
Jobs []*target.Item `json:"targets"`
}
type Server struct {
logger logr.Logger
allocator allocation.Allocator
server *http.Server
httpsServer *http.Server
jsonMarshaller jsoniter.API
// Use RWMutex to protect scrapeConfigResponse, since it
// will be predominantly read and only written when config
// is applied.
mtx sync.RWMutex
scrapeConfigResponse []byte
ScrapeConfigMarshalledSecretResponse []byte
}
type Option func(*Server)
// Option to create an additional https server with mTLS configuration.
// Used for getting the scrape config with real secret values.
func WithTLSConfig(tlsConfig *tls.Config, httpsListenAddr string) Option {
return func(s *Server) {
httpsRouter := gin.New()
s.setRouter(httpsRouter)
s.httpsServer = &http.Server{Addr: httpsListenAddr, Handler: httpsRouter, ReadHeaderTimeout: 90 * time.Second, TLSConfig: tlsConfig}
err := s.server.Shutdown(context.Background())
if err != nil {
s.logger.Error(err, "Failed to shutdown http server")
}
if errors.Is(err, http.ErrServerClosed) {
s.logger.Info("Http server is already closed")
}
s.server = s.httpsServer
}
}
func (s *Server) setRouter(router *gin.Engine) {
router.Use(gin.Recovery())
router.UseRawPath = true
router.UnescapePathValues = false
router.Use(s.PrometheusMiddleware)
router.GET("/scrape_configs", s.ScrapeConfigsHandler)
router.GET("/jobs", s.JobHandler)
router.GET("/jobs/:job_id/targets", s.TargetsHandler)
router.GET("/metrics", gin.WrapH(promhttp.Handler()))
router.GET("/livez", s.LivenessProbeHandler)
router.GET("/readyz", s.ReadinessProbeHandler)
}
func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr string, options ...Option) *Server {
s := &Server{
logger: log,
allocator: allocator,
jsonMarshaller: jsonConfig,
}
gin.SetMode(gin.ReleaseMode)
router := gin.New()
s.setRouter(router)
s.server = &http.Server{Addr: listenAddr, Handler: router, ReadHeaderTimeout: 90 * time.Second}
for _, opt := range options {
opt(s)
}
return s
}
func (s *Server) Start() error {
s.logger.Info("Starting server...")
return s.server.ListenAndServe()
}
func (s *Server) Shutdown(ctx context.Context) error {
s.logger.Info("Shutting down server...")
return s.server.Shutdown(ctx)
}
func (s *Server) StartHTTPS() error {
s.logger.Info("Starting HTTPS server...")
return s.httpsServer.ListenAndServeTLS("", "")
}
func (s *Server) ShutdownHTTPS(ctx context.Context) error {
s.logger.Info("Shutting down HTTPS server...")
return s.httpsServer.Shutdown(ctx)
}
// RemoveRegexFromRelabelAction is needed specifically for keepequal/dropequal actions because even though the user doesn't specify the
// regex field for these actions the unmarshalling implementations of prometheus adds back the default regex fields
// which in turn causes the receiver to error out since the unmarshaling of the json response doesn't expect anything in the regex fields
// for these actions. Adding this as a fix until the original issue with prometheus unmarshaling is fixed -
// https://github.com/prometheus/prometheus/issues/12534
func RemoveRegexFromRelabelAction(jsonConfig []byte) ([]byte, error) {
var jobToScrapeConfig map[string]interface{}
err := json.Unmarshal(jsonConfig, &jobToScrapeConfig)
if err != nil {
return nil, err
}
for _, scrapeConfig := range jobToScrapeConfig {
scrapeConfig := scrapeConfig.(map[string]interface{})
if scrapeConfig["relabel_configs"] != nil {
relabelConfigs := scrapeConfig["relabel_configs"].([]interface{})
for _, relabelConfig := range relabelConfigs {
relabelConfig := relabelConfig.(map[string]interface{})
// Dropping regex key from the map since unmarshalling this on the client(metrics_receiver.go) results in error
// because of the bug here - https://github.com/prometheus/prometheus/issues/12534
if relabelConfig["action"] == "keepequal" || relabelConfig["action"] == "dropequal" {
delete(relabelConfig, "regex")
}
}
}
if scrapeConfig["metric_relabel_configs"] != nil {
metricRelabelConfigs := scrapeConfig["metric_relabel_configs"].([]interface{})
for _, metricRelabelConfig := range metricRelabelConfigs {
metricRelabelConfig := metricRelabelConfig.(map[string]interface{})
// Dropping regex key from the map since unmarshalling this on the client(metrics_receiver.go) results in error
// because of the bug here - https://github.com/prometheus/prometheus/issues/12534
if metricRelabelConfig["action"] == "keepequal" || metricRelabelConfig["action"] == "dropequal" {
delete(metricRelabelConfig, "regex")
}
}
}
}
jsonConfigNew, err := json.Marshal(jobToScrapeConfig)
if err != nil {
return nil, err
}
return jsonConfigNew, nil
}
func (s *Server) MarshalScrapeConfig(configs map[string]*promconfig.ScrapeConfig, marshalSecretValue bool) error {
var configBytes []byte
promcommconfig.MarshalSecretValue = marshalSecretValue
configBytes, err := yaml.Marshal(configs)
if err != nil {
return err
}
var jsonConfig []byte
jsonConfig, err = yaml2.YAMLToJSON(configBytes)
if err != nil {
return err
}
jsonConfigNew, err := RemoveRegexFromRelabelAction(jsonConfig)
if err != nil {
return err
}
s.mtx.Lock()
if marshalSecretValue {
s.ScrapeConfigMarshalledSecretResponse = jsonConfigNew
} else {
s.scrapeConfigResponse = jsonConfigNew
}
s.mtx.Unlock()
return nil
}
// UpdateScrapeConfigResponse updates the scrape config response. The target allocator first marshals these
// configurations such that the underlying prometheus marshaling is used. After that, the YAML is converted
// in to a JSON format for consumers to use.
func (s *Server) UpdateScrapeConfigResponse(configs map[string]*promconfig.ScrapeConfig) error {
err := s.MarshalScrapeConfig(configs, false)
if err != nil {
return err
}
err = s.MarshalScrapeConfig(configs, true)
if err != nil {
return err
}
return nil
}
// ScrapeConfigsHandler returns the available scrape configuration discovered by the target allocator.
func (s *Server) ScrapeConfigsHandler(c *gin.Context) {
s.mtx.RLock()
result := s.scrapeConfigResponse
if c.Request.TLS != nil {
result = s.ScrapeConfigMarshalledSecretResponse
}
s.mtx.RUnlock()
// We don't use the jsonHandler method because we don't want our bytes to be re-encoded
c.Writer.Header().Set("Content-Type", "application/json")
_, err := c.Writer.Write(result)
if err != nil {
s.errorHandler(c.Writer, err)
}
}
func (s *Server) ReadinessProbeHandler(c *gin.Context) {
s.mtx.RLock()
result := s.scrapeConfigResponse
s.mtx.RUnlock()
if result != nil {
c.Status(http.StatusOK)
} else {
c.Status(http.StatusServiceUnavailable)
}
}
func (s *Server) JobHandler(c *gin.Context) {
displayData := make(map[string]target.LinkJSON)
for _, v := range s.allocator.TargetItems() {
displayData[v.JobName] = target.LinkJSON{Link: v.Link.Link}
}
s.jsonHandler(c.Writer, displayData)
}
func (s *Server) LivenessProbeHandler(c *gin.Context) {
c.Status(http.StatusOK)
}
func (s *Server) PrometheusMiddleware(c *gin.Context) {
path := c.FullPath()
timer := prometheus.NewTimer(httpDuration.WithLabelValues(path))
c.Next()
timer.ObserveDuration()
}
func (s *Server) TargetsHandler(c *gin.Context) {
q := c.Request.URL.Query()["collector_id"]
jobIdParam := c.Params.ByName("job_id")
jobId, err := url.QueryUnescape(jobIdParam)
if err != nil {
s.errorHandler(c.Writer, err)
return
}
if len(q) == 0 {
displayData := GetAllTargetsByJob(s.allocator, jobId)
s.jsonHandler(c.Writer, displayData)
} else {
tgs := s.allocator.GetTargetsForCollectorAndJob(q[0], jobId)
// Displays empty list if nothing matches
if len(tgs) == 0 {
s.jsonHandler(c.Writer, []interface{}{})
return
}
s.jsonHandler(c.Writer, tgs)
}
}
func (s *Server) errorHandler(w http.ResponseWriter, err error) {
w.WriteHeader(http.StatusInternalServerError)
s.jsonHandler(w, err)
}
func (s *Server) jsonHandler(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
err := s.jsonMarshaller.NewEncoder(w).Encode(data)
if err != nil {
s.logger.Error(err, "failed to encode data for http response")
}
}
// GetAllTargetsByJob is a relatively expensive call that is usually only used for debugging purposes.
func GetAllTargetsByJob(allocator allocation.Allocator, job string) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, col := range allocator.Collectors() {
items := allocator.GetTargetsForCollectorAndJob(col.Name, job)
displayData[col.Name] = collectorJSON{Link: fmt.Sprintf("/jobs/%s/targets?collector_id=%s", url.QueryEscape(job), col.Name), Jobs: items}
}
return displayData
}