internal/manifests/targetallocator/adapters/config_to_prom_config.go (207 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package adapters
import (
"errors"
"fmt"
"net/url"
"regexp"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/naming"
)
func errorNoComponent(component string) error {
return fmt.Errorf("no %s available as part of the configuration", component)
}
func errorNotAMapAtIndex(component string, index int) error {
return fmt.Errorf("index %d: %s property in the configuration doesn't contain a valid map: %s", index, component, component)
}
func errorNotAMap(component string) error {
return fmt.Errorf("%s property in the configuration doesn't contain valid %s", component, component)
}
func errorNotAList(component string) error {
return fmt.Errorf("%s must be a list in the config", component)
}
func errorNotAListAtIndex(component string, index int) error {
return fmt.Errorf("index %d: %s property in the configuration doesn't contain a valid index: %s", index, component, component)
}
func errorNotAStringAtIndex(component string, index int) error {
return fmt.Errorf("index %d: %s property in the configuration doesn't contain a valid string: %s", index, component, component)
}
// getScrapeConfigsFromPromConfig extracts the scrapeConfig array from prometheus config.
func getScrapeConfigsFromPromConfig(promConfig map[interface{}]interface{}) ([]interface{}, error) {
prometheusConfigProperty, ok := promConfig["config"]
if !ok {
return nil, errorNoComponent("prometheusConfig")
}
prometheusConfig, ok := prometheusConfigProperty.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMap("prometheusConfig")
}
scrapeConfigsProperty, ok := prometheusConfig["scrape_configs"]
if !ok {
return nil, errorNoComponent("scrape_configs")
}
scrapeConfigs, ok := scrapeConfigsProperty.([]interface{})
if !ok {
return nil, errorNotAList("scrape_configs")
}
return scrapeConfigs, nil
}
// GetPromConfig returns a Prometheus configuration file.
func GetPromConfig(cfg string) (map[interface{}]interface{}, error) {
prometheus, err := adapters.ConfigFromString(cfg)
if err != nil {
return nil, err
}
scrapeConfigs, err := getScrapeConfigsFromPromConfig(prometheus)
if err != nil {
return nil, err
}
for i, config := range scrapeConfigs {
scrapeConfig, ok := config.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMapAtIndex("scrape_config", i)
}
relabelConfigsProperty, ok := scrapeConfig["relabel_configs"]
if !ok {
continue
}
relabelConfigs, ok := relabelConfigsProperty.([]interface{})
if !ok {
return nil, errorNotAListAtIndex("relabel_configs", i)
}
for i, rc := range relabelConfigs {
relabelConfig, rcErr := rc.(map[interface{}]interface{})
if !rcErr {
return nil, errorNotAMapAtIndex("relabel_config", i)
}
replacementProperty, rcErr := relabelConfig["replacement"]
if !rcErr {
continue
}
_, rcErr = replacementProperty.(string)
if !rcErr {
return nil, errorNotAStringAtIndex("replacement", i)
}
}
metricRelabelConfigsProperty, ok := scrapeConfig["metric_relabel_configs"]
if !ok {
continue
}
metricRelabelConfigs, ok := metricRelabelConfigsProperty.([]interface{})
if !ok {
return nil, errorNotAListAtIndex("metric_relabel_configs", i)
}
for i, rc := range metricRelabelConfigs {
relabelConfig, ok := rc.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMapAtIndex("metric_relabel_config", i)
}
replacementProperty, ok := relabelConfig["replacement"]
if !ok {
continue
}
_, ok = replacementProperty.(string)
if !ok {
return nil, errorNotAStringAtIndex("replacement", i)
}
}
}
return prometheus, nil
}
// AddHTTPSDConfigToPromConfig adds HTTP SD (Service Discovery) configuration to the Prometheus configuration.
// This function removes any existing service discovery configurations (e.g., `sd_configs`, `dns_sd_configs`, `file_sd_configs`, etc.)
// from the `scrape_configs` section and adds a single `http_sd_configs` configuration.
// The `http_sd_configs` points to the TA (Target Allocator) endpoint that provides the list of targets for the given job.
func AddHTTPSDConfigToPromConfig(prometheus map[interface{}]interface{}, taServiceName string) (map[interface{}]interface{}, error) {
prometheusConfigProperty, ok := prometheus["config"]
if !ok {
return nil, errorNoComponent("prometheusConfig")
}
prometheusConfig, ok := prometheusConfigProperty.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMap("prometheusConfig")
}
scrapeConfigsProperty, ok := prometheusConfig["scrape_configs"]
if !ok {
return nil, errorNoComponent("scrape_configs")
}
scrapeConfigs, ok := scrapeConfigsProperty.([]interface{})
if !ok {
return nil, errorNotAList("scrape_configs")
}
sdRegex := regexp.MustCompile(`^.*(sd|static)_configs$`)
for i, config := range scrapeConfigs {
scrapeConfig, ok := config.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMapAtIndex("scrape_config", i)
}
// Check for other types of service discovery configs (e.g. dns_sd_configs, file_sd_configs, etc.)
for key := range scrapeConfig {
keyStr, keyErr := key.(string)
if !keyErr {
continue
}
if sdRegex.MatchString(keyStr) {
delete(scrapeConfig, key)
}
}
jobNameProperty, ok := scrapeConfig["job_name"]
if !ok {
return nil, errorNotAStringAtIndex("job_name", i)
}
jobName, ok := jobNameProperty.(string)
if !ok {
return nil, errorNotAStringAtIndex("job_name is not a string", i)
}
escapedJob := url.QueryEscape(jobName)
scrapeConfig["http_sd_configs"] = []interface{}{
map[string]interface{}{
"url": fmt.Sprintf("https://%s:%d/jobs/%s/targets", taServiceName, naming.TargetAllocatorServicePort, escapedJob),
},
}
}
return prometheus, nil
}
// AddTAConfigToPromConfig adds or updates the target_allocator configuration in the Prometheus configuration.
// If the `EnableTargetAllocatorRewrite` feature flag for the target allocator is enabled, this function
// removes the existing scrape_configs from the collector's Prometheus configuration as it's not required.
func AddTAConfigToPromConfig(prometheus map[interface{}]interface{}, taServiceName string) (map[interface{}]interface{}, error) {
prometheusConfigProperty, ok := prometheus["config"]
if !ok {
return nil, errorNoComponent("prometheusConfig")
}
prometheusCfg, ok := prometheusConfigProperty.(map[interface{}]interface{})
if !ok {
return nil, errorNotAMap("prometheusConfig")
}
// Create the TargetAllocConfig dynamically if it doesn't exist
if prometheus["target_allocator"] == nil {
prometheus["target_allocator"] = make(map[interface{}]interface{})
}
targetAllocatorCfg, ok := prometheus["target_allocator"].(map[interface{}]interface{})
if !ok {
return nil, errorNotAMap("target_allocator")
}
targetAllocatorCfg["endpoint"] = fmt.Sprintf("https://%s:%d", taServiceName, naming.TargetAllocatorServicePort)
targetAllocatorCfg["interval"] = "30s"
// Remove the scrape_configs key from the map
delete(prometheusCfg, "scrape_configs")
return prometheus, nil
}
// ValidatePromConfig checks if the prometheus config is valid given other collector-level settings.
func ValidatePromConfig(config map[interface{}]interface{}, targetAllocatorEnabled bool, targetAllocatorRewriteEnabled bool) error {
_, promConfigExists := config["config"]
if targetAllocatorEnabled {
if targetAllocatorRewriteEnabled { // if rewrite is enabled, we will add a target_allocator section during rewrite
return nil
}
_, targetAllocatorExists := config["target_allocator"]
// otherwise, either the target_allocator or config section needs to be here
if !(promConfigExists || targetAllocatorExists) {
return errors.New("either target allocator or prometheus config needs to be present")
}
return nil
}
// if target allocator isn't enabled, we need a config section
if !promConfigExists {
return errorNoComponent("prometheusConfig")
}
return nil
}
// ValidateTargetAllocatorConfig checks if the Target Allocator config is valid
// In order for Target Allocator to do anything useful, at least one of the following has to be true:
// - at least one scrape config has to be defined in Prometheus configuration
// - PrometheusCR has to be enabled in target allocator settings
func ValidateTargetAllocatorConfig(targetAllocatorPrometheusCR bool, promConfig map[interface{}]interface{}) error {
if targetAllocatorPrometheusCR {
return nil
}
// if PrometheusCR isn't enabled, we need at least one scrape config
scrapeConfigs, err := getScrapeConfigsFromPromConfig(promConfig)
if err != nil {
return err
}
if len(scrapeConfigs) == 0 {
return fmt.Errorf("either at least one scrape config needs to be defined or PrometheusCR needs to be enabled")
}
return nil
}