cmd/collector/config/export.go (233 lines of code) (raw):

package config import ( "fmt" "net" "regexp" "strconv" "strings" "github.com/Azure/adx-mon/collector/export" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/pkg/remote" "github.com/Azure/adx-mon/transform" ) type Exporters struct { OtlpMetricExport []*OtlpMetricExport `toml:"otlp-metric-export,omitempty" comment:"Configuration for exporting metrics to an OTLP/HTTP endpoint."` FluentForwardLogExport []*FluentForwardLogExport `toml:"fluent-forward-log-export,omitempty" comment:"Configuration for exporting logs to a Fluentd/Fluent Bit endpoint."` } func (e *Exporters) Validate() error { exporterNames := make(map[string]struct{}) for i, exporter := range e.OtlpMetricExport { if err := exporter.Validate(); err != nil { return fmt.Errorf("exporter.otlp-metric-export[%d].%w", i, err) } if _, ok := exporterNames[exporter.Name]; ok { return fmt.Errorf("exporter.otlp-metric-export[%d].name %q is not unique", i, exporter.Name) } exporterNames[exporter.Name] = struct{}{} } for i, exporter := range e.FluentForwardLogExport { if err := exporter.Validate(); err != nil { return fmt.Errorf("exporter.fluent-forward-log-export[%d].%w", i, err) } if _, ok := exporterNames[exporter.Name]; ok { return fmt.Errorf("exporter.fluent-forward-log-export[%d].name %q is not unique", i, exporter.Name) } exporterNames[exporter.Name] = struct{}{} } return nil } func GetMetricsExporter(name string, e *Exporters) (remote.RemoteWriteClient, error) { if e == nil { return nil, fmt.Errorf("exporters config not set") } for _, exporter := range e.OtlpMetricExport { if exporter.Name == name { return exporter.GetWriteClient() } } return nil, fmt.Errorf("exporter %s not found", name) } func HasMetricsExporter(name string, e *Exporters) bool { if e == nil { return false } for _, exporter := range e.OtlpMetricExport { if exporter.Name == name { return true } } return false } func GetLogsExporter(name string, e *Exporters) (types.Sink, error) { if e == nil { return nil, fmt.Errorf("exporters config not set") } for _, exporter := range e.FluentForwardLogExport { if exporter.Name == name { opts := export.LogToFluentExporterOpts{ Destination: exporter.Destination, TagAttribute: exporter.TagAttribute, } return export.NewLogToFluentExporter(opts) } } return nil, fmt.Errorf("exporter %s not found", name) } func HasLogsExporter(name string, e *Exporters) bool { if e == nil { return false } for _, exporter := range e.FluentForwardLogExport { if exporter.Name == name { return true } } return false } // OtlpMetricExport exports metric telemetry to OTLP/HTTP metrics endpoints. type OtlpMetricExport struct { Name string `toml:"name" comment:"Name of the exporter."` Destination string `toml:"destination" comment:"OTLP/HTTP endpoint to send metrics to."` DefaultDropMetrics *bool `toml:"default-drop-metrics" comment:"Default to dropping all metrics. Only metrics matching a keep rule will be kept."` AddLabels map[string]string `toml:"add-labels,omitempty" comment:"Key/value pairs of labels to add to all metrics."` DropLabels map[string]string `toml:"drop-labels" comment:"Labels to drop if they match a metrics regex in the format <metrics regex>=<label name>."` DropMetrics []string `toml:"drop-metrics" comment:"Regexes of metrics to drop."` KeepMetrics []string `toml:"keep-metrics" comment:"Regexes of metrics to keep."` KeepMetricsWithLabelValue []LabelMatcher `toml:"keep-metrics-with-label-value" comment:"Regexes of metrics to keep if they have the given label and value."` AddResourceAttributes map[string]string `toml:"add-resource-attributes" comment:"Key/value pairs of resource attributes to add to all metrics."` } func (o *OtlpMetricExport) Validate() error { if o.Name == "" { return fmt.Errorf("name must be set") } if o.Destination == "" { return fmt.Errorf("destination must be set") } _, err := o.GetWriteClient() if err != nil { return fmt.Errorf("invalid exporter config: %w", err) } return nil } func (o *OtlpMetricExport) GetWriteClient() (remote.RemoteWriteClient, error) { transformer, err := NewTransformer(o.DefaultDropMetrics, o.AddLabels, o.DropLabels, o.DropMetrics, o.KeepMetrics, o.KeepMetricsWithLabelValue) if err != nil { return nil, err } opts := export.PromToOtlpExporterOpts{ Transformer: transformer, Destination: o.Destination, AddResourceAttributes: o.AddResourceAttributes, } return export.NewPromToOtlpExporter(opts), nil } func NewTransformer(defaultDropMetrics *bool, addLabels map[string]string, dropLabels map[string]string, dropMetrics, keepMetrics []string, keepMetricsWithLabelValue []LabelMatcher) (*transform.RequestTransformer, error) { dropLabelsMap, err := getRegexMappings(dropLabels) if err != nil { return nil, err } dropMetricsRegex, err := getRegexList(dropMetrics) if err != nil { return nil, err } keepMetricsRegex, err := getRegexList(keepMetrics) if err != nil { return nil, err } keepMetricsWithLabelValueMap, err := getLabelMappings(keepMetricsWithLabelValue) if err != nil { return nil, err } return &transform.RequestTransformer{ DefaultDropMetrics: getDefaultDropMetrics(defaultDropMetrics), AddLabels: addLabels, DropLabels: dropLabelsMap, DropMetrics: dropMetricsRegex, KeepMetrics: keepMetricsRegex, KeepMetricsWithLabelValue: keepMetricsWithLabelValueMap, }, nil } func getDefaultDropMetrics(defaultDropMetrics *bool) bool { if defaultDropMetrics == nil { return false } return *defaultDropMetrics } func getRegexList(regexList []string) ([]*regexp.Regexp, error) { regexes := make([]*regexp.Regexp, 0, len(regexList)) for _, r := range regexList { regex, err := regexp.Compile(r) if err != nil { return nil, fmt.Errorf("invalid metric regex %s: %w", r, err) } regexes = append(regexes, regex) } return regexes, nil } func getRegexMappings(mappings map[string]string) (map[*regexp.Regexp]*regexp.Regexp, error) { regexMappings := make(map[*regexp.Regexp]*regexp.Regexp) for k, v := range mappings { kRegex, err := regexp.Compile(k) if err != nil { return nil, fmt.Errorf("invalid metric regex %s: %w", k, err) } vRegex, err := regexp.Compile(v) if err != nil { return nil, fmt.Errorf("invalid label regex %s: %w", v, err) } regexMappings[kRegex] = vRegex } return regexMappings, nil } func getLabelMappings(mappings []LabelMatcher) (map[*regexp.Regexp]*regexp.Regexp, error) { regexMappings := make(map[*regexp.Regexp]*regexp.Regexp) for _, mapping := range mappings { kRegex, err := regexp.Compile(mapping.LabelRegex) if err != nil { return nil, fmt.Errorf("invalid metric regex %s: %w", mapping.LabelRegex, err) } vRegex, err := regexp.Compile(mapping.ValueRegex) if err != nil { return nil, fmt.Errorf("invalid label regex %s: %w", mapping.ValueRegex, err) } regexMappings[kRegex] = vRegex } return regexMappings, nil } type FluentForwardLogExport struct { Name string `toml:"name" comment:"Name of the exporter."` Destination string `toml:"destination" comment:"Fluentd/Fluent Bit endpoint to send logs to. Must be in the form tcp://<host>:<port> or unix:///path/to/socket."` TagAttribute string `toml:"tag-attribute" comment:"Attribute key to use as the tag for the log. If the attribute is not set, the log will be ignored by this exporter."` } func (f *FluentForwardLogExport) Validate() error { if f.Name == "" { return fmt.Errorf("name must be set") } if f.Destination == "" { return fmt.Errorf("destination must be set") } if strings.HasPrefix(f.Destination, "unix://") { socketPath := strings.TrimPrefix(f.Destination, "unix://") if len(socketPath) == 0 { return fmt.Errorf("invalid destination %s: unix socket path is empty", f.Destination) } } else if strings.HasPrefix(f.Destination, "tcp://") { destinationHostPort := strings.TrimPrefix(f.Destination, "tcp://") _, port, err := net.SplitHostPort(destinationHostPort) if err != nil { return fmt.Errorf("invalid destination %s: %w", f.Destination, err) } numericPort, err := strconv.ParseInt(port, 10, 0) if err != nil { return fmt.Errorf("invalid destination %s: unable to parse %s as integer", f.Destination, port) } if numericPort < 1 || numericPort > 65535 { return fmt.Errorf("invalid destination %s: port %d not between 1 and 65535", f.Destination, numericPort) } } else { return fmt.Errorf("invalid destination %s: must be in the form tcp://<host>:<port> or unix:///path/to/socket", f.Destination) } if f.TagAttribute == "" { return fmt.Errorf("tag-attribute must be set") } return nil }