collector/receiver/prometheusreceiver/config.go (228 lines of code) (raw):

// Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package prometheusreceiver // import "github.com/GoogleCloudPlatform/run-gmp-sidecar/collector/receiver/prometheusreceiver" import ( "encoding/json" "errors" "fmt" "net/url" "os" "path/filepath" "sort" "strings" "time" commonconfig "github.com/prometheus/common/config" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery/file" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/discovery/kubernetes" "github.com/prometheus/prometheus/discovery/targetgroup" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" ) const ( // The key for Prometheus scraping configs. prometheusConfigKey = "config" // keys to access the http_sd_config from config root targetAllocatorConfigKey = "target_allocator" targetAllocatorHTTPSDConfigKey = "http_sd_config" ) type MetricAdjusterOpts struct { // UseStartTimeMetric enables retrieving the start time of all counter // metrics from the process_start_time_seconds metric. This is only correct // if all counters on that endpoint started after the process start time, // and the process is the only actor exporting the metric after the process // started. It should not be used in "exporters" which export counters that // may have started before the process itself. Use only if you know what you // are doing, as this may result in incorrect rate calculations. UseStartTimeMetric bool `mapstructure:"use_start_time_metric"` StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"` // UseCollectorStartTimeFallback enables using a fallback start time if a // start time is otherwise unavailable when adjusting metrics. This would // happen if the UseStartTimeMetric is used but the application doesn't emit // a process_start_time_seconds metric or a metric that matches the // StartTimeMetricRegex provided. // // If enabled, the fallback start time used for adjusted metrics is an // approximation of the collector start time. // // This option should only be used when we can guarantee that the scraped // processes that emit metrics that started after the collector has started. UseCollectorStartTimeFallback bool `mapstructure:"use_collector_start_time_fallback"` // AllowCumulativeResets enables preserving resets of cumulative points when // the metric adjuster is used. Should be enabled if we expect cumulative // point resets AND we want to use the StartTimeMetricAdjuster. Note that // this will require that we cache the previous point for every timeseries, // and so can increase memory used by the collector. AllowCumulativeResets bool `mapstructure:"allow_cumulative_resets"` } // Config defines configuration for Prometheus receiver. type Config struct { PrometheusConfig *promconfig.Config `mapstructure:"-"` TrimMetricSuffixes bool `mapstructure:"trim_metric_suffixes"` BufferPeriod time.Duration `mapstructure:"buffer_period"` BufferCount int `mapstructure:"buffer_count"` TargetAllocator *targetAllocator `mapstructure:"target_allocator"` // ConfigPlaceholder is just an entry to make the configuration pass a check // that requires that all keys present in the config actually exist on the // structure, ie.: it will error if an unknown key is present. ConfigPlaceholder interface{} `mapstructure:"config"` // Settings for adjusting metrics. Will default to using an InitialPointAdjuster // which will use the first scraped point to define the start time for the timeseries. AdjusterOpts MetricAdjusterOpts `mapstructure:",squash"` } type targetAllocator struct { Endpoint string `mapstructure:"endpoint"` Interval time.Duration `mapstructure:"interval"` CollectorID string `mapstructure:"collector_id"` // ConfigPlaceholder is just an entry to make the configuration pass a check // that requires that all keys present in the config actually exist on the // structure, ie.: it will error if an unknown key is present. ConfigPlaceholder interface{} `mapstructure:"http_sd_config"` HTTPSDConfig *promHTTP.SDConfig `mapstructure:"-"` } var _ component.Config = (*Config)(nil) var _ confmap.Unmarshaler = (*Config)(nil) func checkFile(fn string) error { // Nothing set, nothing to error on. if fn == "" { return nil } _, err := os.Stat(fn) return err } func checkTLSConfig(tlsConfig commonconfig.TLSConfig) error { if err := checkFile(tlsConfig.CertFile); err != nil { return fmt.Errorf("error checking client cert file %q: %w", tlsConfig.CertFile, err) } if err := checkFile(tlsConfig.KeyFile); err != nil { return fmt.Errorf("error checking client key file %q: %w", tlsConfig.KeyFile, err) } return nil } // Method to exercise the prometheus file discovery behavior to ensure there are no errors // - reference https://github.com/prometheus/prometheus/blob/c0c22ed04200a8d24d1d5719f605c85710f0d008/discovery/file/file.go#L372 func checkSDFile(filename string) error { content, err := os.ReadFile(filepath.Clean(filename)) if err != nil { return err } var targetGroups []*targetgroup.Group switch ext := filepath.Ext(filename); strings.ToLower(ext) { case ".json": if err := json.Unmarshal(content, &targetGroups); err != nil { return fmt.Errorf("error in unmarshaling json file extension: %w", err) } case ".yml", ".yaml": if err := yaml.UnmarshalStrict(content, &targetGroups); err != nil { return fmt.Errorf("error in unmarshaling yaml file extension: %w", err) } default: return fmt.Errorf("invalid file extension: %q", ext) } for i, tg := range targetGroups { if tg == nil { return fmt.Errorf("nil target group item found (index %d)", i) } } return nil } // Validate checks the receiver configuration is valid. func (cfg *Config) Validate() error { promConfig := cfg.PrometheusConfig if promConfig != nil { err := cfg.validatePromConfig(promConfig) if err != nil { return err } } if cfg.TargetAllocator != nil { err := cfg.validateTargetAllocatorConfig() if err != nil { return err } } return nil } func (cfg *Config) validatePromConfig(promConfig *promconfig.Config) error { if len(promConfig.ScrapeConfigs) == 0 && cfg.TargetAllocator == nil { return errors.New("no Prometheus scrape_configs or target_allocator set") } // Reject features that Prometheus supports but that the receiver doesn't support: // See: // * https://github.com/open-telemetry/opentelemetry-collector/issues/3863 // * https://github.com/open-telemetry/wg-prometheus/issues/3 unsupportedFeatures := make([]string, 0, 4) if len(promConfig.RemoteWriteConfigs) != 0 { unsupportedFeatures = append(unsupportedFeatures, "remote_write") } if len(promConfig.RemoteReadConfigs) != 0 { unsupportedFeatures = append(unsupportedFeatures, "remote_read") } if len(promConfig.RuleFiles) != 0 { unsupportedFeatures = append(unsupportedFeatures, "rule_files") } if len(promConfig.AlertingConfig.AlertRelabelConfigs) != 0 { unsupportedFeatures = append(unsupportedFeatures, "alert_config.relabel_configs") } if len(promConfig.AlertingConfig.AlertmanagerConfigs) != 0 { unsupportedFeatures = append(unsupportedFeatures, "alert_config.alertmanagers") } if len(unsupportedFeatures) != 0 { // Sort the values for deterministic error messages. sort.Strings(unsupportedFeatures) return fmt.Errorf("unsupported features:\n\t%s", strings.Join(unsupportedFeatures, "\n\t")) } for _, sc := range cfg.PrometheusConfig.ScrapeConfigs { for _, rc := range sc.MetricRelabelConfigs { if rc.TargetLabel == "__name__" { // TODO(#2297): Remove validation after renaming is fixed return fmt.Errorf("error validating scrapeconfig for job %v: %w", sc.JobName, errRenamingDisallowed) } } if sc.HTTPClientConfig.Authorization != nil { if err := checkFile(sc.HTTPClientConfig.Authorization.CredentialsFile); err != nil { return fmt.Errorf("error checking authorization credentials file %q: %w", sc.HTTPClientConfig.Authorization.CredentialsFile, err) } } if err := checkTLSConfig(sc.HTTPClientConfig.TLSConfig); err != nil { return err } for _, c := range sc.ServiceDiscoveryConfigs { switch c := c.(type) { case *kubernetes.SDConfig: if err := checkTLSConfig(c.HTTPClientConfig.TLSConfig); err != nil { return err } case *file.SDConfig: for _, file := range c.Files { files, err := filepath.Glob(file) if err != nil { return err } if len(files) != 0 { for _, f := range files { err = checkSDFile(f) if err != nil { return fmt.Errorf("checking SD file %q: %w", file, err) } } continue } return fmt.Errorf("file %q for file_sd in scrape job %q does not exist", file, sc.JobName) } } } } return nil } func (cfg *Config) validateTargetAllocatorConfig() error { // validate targetAllocator targetAllocatorConfig := cfg.TargetAllocator if targetAllocatorConfig == nil { return nil } // ensure valid endpoint if _, err := url.ParseRequestURI(targetAllocatorConfig.Endpoint); err != nil { return fmt.Errorf("TargetAllocator endpoint is not valid: %s", targetAllocatorConfig.Endpoint) } // ensure valid collectorID without variables if targetAllocatorConfig.CollectorID == "" || strings.Contains(targetAllocatorConfig.CollectorID, "${") { return fmt.Errorf("CollectorID is not a valid ID") } return nil } // Unmarshal a config.Parser into the config struct. func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { if componentParser == nil { return nil } // We need custom unmarshaling because prometheus "config" subkey defines its own // YAML unmarshaling routines so we need to do it explicitly. err := componentParser.Unmarshal(cfg) if err != nil { return fmt.Errorf("prometheus receiver failed to parse config: %w", err) } // Unmarshal prometheus's config values. Since prometheus uses `yaml` tags, so use `yaml`. promCfg, err := componentParser.Sub(prometheusConfigKey) if err != nil || len(promCfg.ToStringMap()) == 0 { return err } out, err := yaml.Marshal(promCfg.ToStringMap()) if err != nil { return fmt.Errorf("prometheus receiver failed to marshal config to yaml: %w", err) } err = yaml.UnmarshalStrict(out, &cfg.PrometheusConfig) if err != nil { return fmt.Errorf("prometheus receiver failed to unmarshal yaml to prometheus config: %w", err) } // Unmarshal targetAllocator configs targetAllocatorCfg, err := componentParser.Sub(targetAllocatorConfigKey) if err != nil { return err } targetAllocatorHTTPSDCfg, err := targetAllocatorCfg.Sub(targetAllocatorHTTPSDConfigKey) if err != nil { return err } targetAllocatorHTTPSDMap := targetAllocatorHTTPSDCfg.ToStringMap() if len(targetAllocatorHTTPSDMap) != 0 { targetAllocatorHTTPSDMap["url"] = "http://placeholder" // we have to set it as else the marshal will fail httpSDConf, err := yaml.Marshal(targetAllocatorHTTPSDMap) if err != nil { return fmt.Errorf("prometheus receiver failed to marshal config to yaml: %w", err) } err = yaml.UnmarshalStrict(httpSDConf, &cfg.TargetAllocator.HTTPSDConfig) if err != nil { return fmt.Errorf("prometheus receiver failed to unmarshal yaml to prometheus config: %w", err) } } return nil }