otelcollector/otel-allocator/internal/config/config.go (324 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package config
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/fs"
"os"
"reflect"
"time"
"github.com/go-logr/logr"
"github.com/go-viper/mapstructure/v2"
"github.com/prometheus/common/model"
promconfig "github.com/prometheus/prometheus/config"
_ "github.com/prometheus/prometheus/discovery/install"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)
const (
DefaultResyncTime = 5 * time.Minute
DefaultConfigFilePath string = "/conf/targetallocator.yaml"
DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30)
DefaultAllocationStrategy = "consistent-hashing"
DefaultFilterStrategy = "relabel-config"
DefaultCollectorNotReadyGracePeriod = 0 * time.Second
)
type Config struct {
ListenAddr string `yaml:"listen_addr,omitempty"`
KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"`
ClusterConfig *rest.Config `yaml:"-"`
RootLogger logr.Logger `yaml:"-"`
CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"`
CollectorNamespace string `yaml:"collector_namespace,omitempty"`
PromConfig *promconfig.Config `yaml:"config"`
AllocationStrategy string `yaml:"allocation_strategy,omitempty"`
AllocationFallbackStrategy string `yaml:"allocation_fallback_strategy,omitempty"`
FilterStrategy string `yaml:"filter_strategy,omitempty"`
PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"`
HTTPS HTTPSServerConfig `yaml:"https,omitempty"`
CollectorNotReadyGracePeriod time.Duration `yaml:"collector_not_ready_grace_period,omitempty"`
}
type PrometheusCRConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
AllowNamespaces []string `yaml:"allow_namespaces,omitempty"`
DenyNamespaces []string `yaml:"deny_namespaces,omitempty"`
PodMonitorSelector *metav1.LabelSelector `yaml:"pod_monitor_selector,omitempty"`
PodMonitorNamespaceSelector *metav1.LabelSelector `yaml:"pod_monitor_namespace_selector,omitempty"`
ServiceMonitorSelector *metav1.LabelSelector `yaml:"service_monitor_selector,omitempty"`
ServiceMonitorNamespaceSelector *metav1.LabelSelector `yaml:"service_monitor_namespace_selector,omitempty"`
ScrapeConfigSelector *metav1.LabelSelector `yaml:"scrape_config_selector,omitempty"`
ScrapeConfigNamespaceSelector *metav1.LabelSelector `yaml:"scrape_config_namespace_selector,omitempty"`
ProbeSelector *metav1.LabelSelector `yaml:"probe_selector,omitempty"`
ProbeNamespaceSelector *metav1.LabelSelector `yaml:"probe_namespace_selector,omitempty"`
ScrapeInterval model.Duration `yaml:"scrape_interval,omitempty"`
}
type HTTPSServerConfig struct {
Enabled bool `yaml:"enabled,omitempty"`
ListenAddr string `yaml:"listen_addr,omitempty"`
CAFilePath string `yaml:"ca_file_path,omitempty"`
TLSCertFilePath string `yaml:"tls_cert_file_path,omitempty"`
TLSKeyFilePath string `yaml:"tls_key_file_path,omitempty"`
}
// StringToModelDurationHookFunc returns a DecodeHookFuncType
// that converts string to time.Duration, which can be used
// as model.Duration.
func StringToModelDurationHookFunc() mapstructure.DecodeHookFuncType {
return func(
f reflect.Type,
t reflect.Type,
data interface{},
) (interface{}, error) {
if f.Kind() != reflect.String {
return data, nil
}
if t != reflect.TypeOf(model.Duration(5)) {
return data, nil
}
return time.ParseDuration(data.(string))
}
}
// MapToPromConfig returns a DecodeHookFuncType that provides a mechanism
// for decoding promconfig.Config involving its own unmarshal logic.
func MapToPromConfig() mapstructure.DecodeHookFuncType {
return func(
f reflect.Type,
t reflect.Type,
data interface{},
) (interface{}, error) {
if f.Kind() != reflect.Map {
return data, nil
}
if t != reflect.TypeOf(&promconfig.Config{}) {
return data, nil
}
pConfig := &promconfig.Config{}
mb, err := yaml.Marshal(data.(map[any]any))
if err != nil {
return nil, err
}
err = yaml.Unmarshal(mb, pConfig)
if err != nil {
return nil, err
}
return pConfig, nil
}
}
// MapToLabelSelector returns a DecodeHookFuncType that
// provides a mechanism for decoding both matchLabels and matchExpressions from camelcase to lowercase
// because we use yaml unmarshaling that supports lowercase field names if no `yaml` tag is defined
// and metav1.LabelSelector uses `json` tags.
// If both the camelcase and lowercase version is present, then the camelcase version takes precedence.
func MapToLabelSelector() mapstructure.DecodeHookFuncType {
return func(
f reflect.Type,
t reflect.Type,
data interface{},
) (interface{}, error) {
if f.Kind() != reflect.Map {
return data, nil
}
if t != reflect.TypeOf(&metav1.LabelSelector{}) {
return data, nil
}
result := &metav1.LabelSelector{}
fMap := data.(map[any]any)
if matchLabels, ok := fMap["matchLabels"]; ok {
fMap["matchlabels"] = matchLabels
delete(fMap, "matchLabels")
}
if matchExpressions, ok := fMap["matchExpressions"]; ok {
fMap["matchexpressions"] = matchExpressions
delete(fMap, "matchExpressions")
}
b, err := yaml.Marshal(fMap)
if err != nil {
return nil, err
}
err = yaml.Unmarshal(b, result)
if err != nil {
return nil, err
}
return result, nil
}
}
func LoadFromFile(file string, target *Config) error {
return unmarshal(target, file)
}
func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error {
var err error
// set the rest of the config attributes based on command-line flag values
target.RootLogger = zap.New(zap.UseFlagOptions(&zapCmdLineOpts))
klog.SetLogger(target.RootLogger)
ctrl.SetLogger(target.RootLogger)
target.KubeConfigFilePath, err = getKubeConfigFilePath(flagSet)
if err != nil {
return err
}
clusterConfig, err := clientcmd.BuildConfigFromFlags("", target.KubeConfigFilePath)
if err != nil {
pathError := &fs.PathError{}
if ok := errors.As(err, &pathError); !ok {
return err
}
clusterConfig, err = rest.InClusterConfig()
if err != nil {
return err
}
target.KubeConfigFilePath = ""
}
target.ClusterConfig = clusterConfig
target.ListenAddr, err = getListenAddr(flagSet)
if err != nil {
return err
}
if prometheusCREnabled, changed, flagErr := getPrometheusCREnabled(flagSet); flagErr != nil {
return flagErr
} else if changed {
target.PrometheusCR.Enabled = prometheusCREnabled
}
if httpsEnabled, changed, err := getHttpsEnabled(flagSet); err != nil {
return err
} else if changed {
target.HTTPS.Enabled = httpsEnabled
}
if listenAddrHttps, changed, err := getHttpsListenAddr(flagSet); err != nil {
return err
} else if changed {
target.HTTPS.ListenAddr = listenAddrHttps
}
if caFilePath, changed, err := getHttpsCAFilePath(flagSet); err != nil {
return err
} else if changed {
target.HTTPS.CAFilePath = caFilePath
}
if tlsCertFilePath, changed, err := getHttpsTLSCertFilePath(flagSet); err != nil {
return err
} else if changed {
target.HTTPS.TLSCertFilePath = tlsCertFilePath
}
if tlsKeyFilePath, changed, err := getHttpsTLSKeyFilePath(flagSet); err != nil {
return err
} else if changed {
target.HTTPS.TLSKeyFilePath = tlsKeyFilePath
}
return nil
}
// LoadFromEnv loads configuration from environment variables.
func LoadFromEnv(target *Config) error {
target.CollectorNamespace = os.Getenv("OTELCOL_NAMESPACE")
return nil
}
// unmarshal decodes the contents of the configFile into the cfg argument, using a
// mapstructure decoder with the following notable behaviors.
// Decodes time.Duration from strings (see StringToModelDurationHookFunc).
// Allows custom unmarshaling for promconfig.Config struct that implements yaml.Unmarshaler (see MapToPromConfig).
// Allows custom unmarshaling for metav1.LabelSelector struct using both camelcase and lowercase field names (see MapToLabelSelector).
func unmarshal(cfg *Config, configFile string) error {
yamlFile, err := os.ReadFile(configFile)
if err != nil {
return err
}
m := make(map[string]interface{})
err = yaml.Unmarshal(yamlFile, &m)
if err != nil {
return fmt.Errorf("error unmarshaling YAML: %w", err)
}
dc := mapstructure.DecoderConfig{
TagName: "yaml",
Result: cfg,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
StringToModelDurationHookFunc(),
MapToPromConfig(),
MapToLabelSelector(),
),
}
decoder, err := mapstructure.NewDecoder(&dc)
if err != nil {
return err
}
if err := decoder.Decode(m); err != nil {
return err
}
return nil
}
func CreateDefaultConfig() Config {
return Config{
AllocationStrategy: DefaultAllocationStrategy,
AllocationFallbackStrategy: "",
FilterStrategy: DefaultFilterStrategy,
PrometheusCR: PrometheusCRConfig{
ScrapeInterval: DefaultCRScrapeInterval,
ServiceMonitorNamespaceSelector: &metav1.LabelSelector{},
PodMonitorNamespaceSelector: &metav1.LabelSelector{},
ScrapeConfigNamespaceSelector: &metav1.LabelSelector{},
ProbeNamespaceSelector: &metav1.LabelSelector{},
},
CollectorNotReadyGracePeriod: DefaultCollectorNotReadyGracePeriod,
}
}
func Load() (*Config, error) {
var err error
flagSet := getFlagSet(pflag.ExitOnError)
err = flagSet.Parse(os.Args)
if err != nil {
return nil, err
}
config := CreateDefaultConfig()
// load the config from the config file
configFilePath, err := getConfigFilePath(flagSet)
if err != nil {
return nil, err
}
err = LoadFromFile(configFilePath, &config)
if err != nil {
return nil, err
}
err = LoadFromEnv(&config)
if err != nil {
return nil, err
}
err = LoadFromCLI(&config, flagSet)
if err != nil {
return nil, err
}
return &config, nil
}
// ValidateConfig validates the cli and file configs together.
func ValidateConfig(config *Config) error {
scrapeConfigsPresent := (config.PromConfig != nil && len(config.PromConfig.ScrapeConfigs) > 0)
if !(config.PrometheusCR.Enabled || scrapeConfigsPresent) {
return fmt.Errorf("at least one scrape config must be defined, or Prometheus CR watching must be enabled")
}
if config.CollectorNamespace == "" {
return fmt.Errorf("collector namespace must be set")
}
if len(config.PrometheusCR.AllowNamespaces) != 0 && len(config.PrometheusCR.DenyNamespaces) != 0 {
return fmt.Errorf("only one of allowNamespaces or denyNamespaces can be set")
}
return nil
}
func (c HTTPSServerConfig) NewTLSConfig() (*tls.Config, error) {
cert, err := tls.LoadX509KeyPair(c.TLSCertFilePath, c.TLSKeyFilePath)
if err != nil {
return nil, err
}
caCert, err := os.ReadFile(c.CAFilePath)
if err != nil {
return nil, err
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: caCertPool,
MinVersion: tls.VersionTLS12,
}
return tlsConfig, nil
}
// GetAllowDenyLists returns the allow and deny lists as maps. If the allow list is empty, it defaults to all namespaces.
// If the deny list is empty, it defaults to an empty map.
func (c PrometheusCRConfig) GetAllowDenyLists() (map[string]struct{}, map[string]struct{}) {
allowList := map[string]struct{}{}
if len(c.AllowNamespaces) != 0 {
for _, ns := range c.AllowNamespaces {
allowList[ns] = struct{}{}
}
} else {
allowList = map[string]struct{}{v1.NamespaceAll: {}}
}
denyList := map[string]struct{}{}
if len(c.DenyNamespaces) != 0 {
for _, ns := range c.DenyNamespaces {
denyList[ns] = struct{}{}
}
}
return allowList, denyList
}