confgenerator/otel/modular.go (175 lines of code) (raw):

// Copyright 2021 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package otel provides data structures to represent and generate otel configuration. package otel import ( "context" "fmt" "github.com/GoogleCloudPlatform/ops-agent/internal/platform" yaml "github.com/goccy/go-yaml" "github.com/mitchellh/mapstructure" commonconfig "github.com/prometheus/common/config" ) const MetricsPort = 20201 type ExporterType int type ResourceDetectionMode int const ( // N.B. Every ExporterType increases the QPS and thus quota // consumption in consumer projects; think hard before adding // another exporter type. OTel ExporterType = iota System GMP ) const ( Override ResourceDetectionMode = iota SetIfMissing None ) func (t ExporterType) Name() string { if t == System || t == GMP { // The collector's OTel and GMP exporters have different types so can share the empty string. return "" } else if t == OTel { return "otel" } else { panic("unknown ExporterType") } } // ReceiverPipeline represents a single OT receiver and zero or more processors that must be chained after that receiver. type ReceiverPipeline struct { Receiver Component // Processors is a map with processors for each pipeline type ("metrics" or "traces"). // If a key is not in the map, the receiver pipeline will not be used for that pipeline type. Processors map[string][]Component // ExporterTypes indicates if the pipeline outputs special data (either Prometheus or system metrics) that need to be handled with a special exporter. ExporterTypes map[string]ExporterType // ResourceDetectionModes indicates whether the resource should be forcibly set, set only if not already present, or never set. // If a data type is not present, it will assume the zero value (Override). ResourceDetectionModes map[string]ResourceDetectionMode } // Pipeline represents one (of potentially many) pipelines consuming data from a ReceiverPipeline. type Pipeline struct { // Type is "metrics" or "traces". Type string ReceiverPipelineName string Processors []Component } // Component represents a single OT component (receiver, processor, exporter, etc.) type Component struct { // Type is the string type needed to instantiate the OT component (e.g. "windowsperfcounters") Type string // Config is an object which can be serialized by mapstructure into the configuration for the component. // This can either be a map[string]interface{} or a Config struct from OT. Config interface{} } func (c Component) name(suffix string) string { if suffix != "" { return fmt.Sprintf("%s/%s", c.Type, suffix) } return c.Type } // configToYaml converts a tree of structs into a YAML file. // To match OT's built-in config parsing, we use mapstructure to convert the tree of structs into a tree of maps. // This allows the direct use of OT's config types at any level of the hierarchy. func configToYaml(config interface{}) ([]byte, error) { outMap := make(map[string]interface{}) if err := mapstructure.Decode(config, &outMap); err != nil { return nil, err } return yaml.MarshalWithOptions( outMap, yaml.CustomMarshaler[commonconfig.Secret](func(s commonconfig.Secret) ([]byte, error) { return []byte(s), nil }), ) } type ModularConfig struct { LogLevel string ReceiverPipelines map[string]ReceiverPipeline Pipelines map[string]Pipeline Exporters map[ExporterType]Component // Test-only options: // Don't generate any self-metrics DisableMetrics bool // Emit collector logs as JSON JSONLogs bool } // Generate an OT YAML config file for c. // Each pipeline gets generated as a receiver, per-pipeline processors, global processors, and then global exporter. // For example: // metrics/mypipe: // // receivers: [hostmetrics/mypipe] // processors: [filter/mypipe_1, metrics_filter/mypipe_2, resourcedetection/_global_0] // exporters: [googlecloud] func (c ModularConfig) Generate(ctx context.Context) (string, error) { pl := platform.FromContext(ctx) receivers := map[string]interface{}{} processors := map[string]interface{}{} exporters := map[string]interface{}{} exporterNames := map[ExporterType]string{} pipelines := map[string]interface{}{} service := map[string]map[string]interface{}{ "pipelines": pipelines, "telemetry": map[string]interface{}{ "metrics": map[string]interface{}{ // TODO: switch to metrics.readers so we can stop binding a port "address": fmt.Sprintf("0.0.0.0:%d", MetricsPort), }, }, } if c.DisableMetrics { service["telemetry"]["metrics"] = map[string]interface{}{ "level": "none", } } logs := map[string]any{} if c.LogLevel != "info" { logs["level"] = "debug" } if c.JSONLogs { logs["encoding"] = "json" } if len(logs) > 0 { service["telemetry"]["logs"] = logs } configMap := map[string]interface{}{ "receivers": receivers, "processors": processors, "exporters": exporters, "service": service, } resourceDetectionProcessors := map[ResourceDetectionMode]Component{ Override: GCPResourceDetector(true), SetIfMissing: GCPResourceDetector(false), } if pl.ResourceOverride != nil { resourceDetectionProcessors = map[ResourceDetectionMode]Component{ Override: ResourceTransform(pl.ResourceOverride.OTelResourceAttributes(), true), SetIfMissing: ResourceTransform(pl.ResourceOverride.OTelResourceAttributes(), false), } } resourceDetectionProcessorNames := map[ResourceDetectionMode]string{ Override: resourceDetectionProcessors[Override].name("_global_0"), SetIfMissing: resourceDetectionProcessors[SetIfMissing].name("_global_1"), } for prefix, pipeline := range c.Pipelines { // Receiver pipelines need to be instantiated once, since they might have more than one type. // We do this work more than once if it's in more than one pipeline, but it should just overwrite the same names. receiverPipeline := c.ReceiverPipelines[pipeline.ReceiverPipelineName] receiverName := receiverPipeline.Receiver.name(pipeline.ReceiverPipelineName) var receiverProcessorNames []string p, ok := receiverPipeline.Processors[pipeline.Type] if !ok { // This receiver pipeline isn't for this data type. continue } for i, processor := range p { name := processor.name(fmt.Sprintf("%s_%d", pipeline.ReceiverPipelineName, i)) receiverProcessorNames = append(receiverProcessorNames, name) processors[name] = processor.Config } receivers[receiverName] = receiverPipeline.Receiver.Config // Everything else in the pipeline is specific to this Type. var processorNames []string processorNames = append(processorNames, receiverProcessorNames...) for i, processor := range pipeline.Processors { name := processor.name(fmt.Sprintf("%s_%d", prefix, i)) processorNames = append(processorNames, name) processors[name] = processor.Config } rdm := receiverPipeline.ResourceDetectionModes[pipeline.Type] if name, ok := resourceDetectionProcessorNames[rdm]; ok { processorNames = append(processorNames, name) processors[name] = resourceDetectionProcessors[rdm].Config } exporterType := receiverPipeline.ExporterTypes[pipeline.Type] if _, ok := exporterNames[exporterType]; !ok { exporter := c.Exporters[exporterType] name := exporter.name(exporterType.Name()) exporterNames[exporterType] = name exporters[name] = exporter.Config } pipelines[pipeline.Type+"/"+prefix] = map[string]interface{}{ "receivers": []string{receiverName}, "processors": processorNames, "exporters": []string{exporterNames[exporterType]}, } } out, err := configToYaml(configMap) // TODO: Return []byte if err != nil { return "", err } return string(out), nil } func contains(s []string, str string) bool { for _, v := range s { if v == str { return true } } return false }