receiver/jaegerreceiver/config.go (131 lines of code) (raw):

// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 package jaegerreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver" import ( "errors" "fmt" "net" "strconv" "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" ) const ( // The config field id to load the protocol map from protocolsFieldName = "protocols" // Default UDP server options defaultQueueSize = 1_000 defaultMaxPacketSize = 65_000 defaultServerWorkers = 10 defaultSocketBufferSize = 0 ) // RemoteSamplingConfig defines config key for remote sampling fetch endpoint type RemoteSamplingConfig struct { HostEndpoint string `mapstructure:"host_endpoint"` StrategyFile string `mapstructure:"strategy_file"` StrategyFileReloadInterval time.Duration `mapstructure:"strategy_file_reload_interval"` configgrpc.ClientConfig `mapstructure:",squash"` } // Protocols is the configuration for the supported protocols. type Protocols struct { GRPC *configgrpc.ServerConfig `mapstructure:"grpc"` ThriftHTTP *confighttp.ServerConfig `mapstructure:"thrift_http"` ThriftBinaryUDP *ProtocolUDP `mapstructure:"thrift_binary"` ThriftCompactUDP *ProtocolUDP `mapstructure:"thrift_compact"` } // ProtocolUDP is the configuration for a UDP protocol. type ProtocolUDP struct { Endpoint string `mapstructure:"endpoint"` ServerConfigUDP `mapstructure:",squash"` } // ServerConfigUDP is the server configuration for a UDP protocol. type ServerConfigUDP struct { QueueSize int `mapstructure:"queue_size"` MaxPacketSize int `mapstructure:"max_packet_size"` Workers int `mapstructure:"workers"` SocketBufferSize int `mapstructure:"socket_buffer_size"` } // defaultServerConfigUDP creates the default ServerConfigUDP. func defaultServerConfigUDP() ServerConfigUDP { return ServerConfigUDP{ QueueSize: defaultQueueSize, MaxPacketSize: defaultMaxPacketSize, Workers: defaultServerWorkers, SocketBufferSize: defaultSocketBufferSize, } } // Config defines configuration for Jaeger receiver. type Config struct { Protocols `mapstructure:"protocols"` RemoteSampling *RemoteSamplingConfig `mapstructure:"remote_sampling"` } var ( _ component.Config = (*Config)(nil) _ confmap.Unmarshaler = (*Config)(nil) ) // Validate checks the receiver configuration is valid func (cfg *Config) Validate() error { if cfg.GRPC == nil && cfg.ThriftHTTP == nil && cfg.ThriftBinaryUDP == nil && cfg.ThriftCompactUDP == nil { return errors.New("must specify at least one protocol when using the Jaeger receiver") } if cfg.GRPC != nil { if err := checkPortFromEndpoint(cfg.GRPC.NetAddr.Endpoint); err != nil { return fmt.Errorf("invalid port number for the gRPC endpoint: %w", err) } } if cfg.ThriftHTTP != nil { if err := checkPortFromEndpoint(cfg.ThriftHTTP.Endpoint); err != nil { return fmt.Errorf("invalid port number for the Thrift HTTP endpoint: %w", err) } } if cfg.ThriftBinaryUDP != nil { if err := checkPortFromEndpoint(cfg.ThriftBinaryUDP.Endpoint); err != nil { return fmt.Errorf("invalid port number for the Thrift UDP Binary endpoint: %w", err) } } if cfg.ThriftCompactUDP != nil { if err := checkPortFromEndpoint(cfg.ThriftCompactUDP.Endpoint); err != nil { return fmt.Errorf("invalid port number for the Thrift UDP Compact endpoint: %w", err) } } if cfg.RemoteSampling != nil { if disableJaegerReceiverRemoteSampling.IsEnabled() { return errors.New("remote sampling config detected in the Jaeger receiver; use the `jaegerremotesampling` extension instead") } } return nil } // Unmarshal a config.Parser into the config struct. func (cfg *Config) Unmarshal(componentParser *confmap.Conf) error { if componentParser == nil || len(componentParser.AllKeys()) == 0 { return errors.New("empty config for Jaeger receiver") } // UnmarshalExact will not set struct properties to nil even if no key is provided, // so set the protocol structs to nil where the keys were omitted. err := componentParser.Unmarshal(cfg) if err != nil { return err } protocols, err := componentParser.Sub(protocolsFieldName) if err != nil { return err } if !protocols.IsSet(protoGRPC) { cfg.GRPC = nil } if !protocols.IsSet(protoThriftHTTP) { cfg.ThriftHTTP = nil } if !protocols.IsSet(protoThriftBinary) { cfg.ThriftBinaryUDP = nil } if !protocols.IsSet(protoThriftCompact) { cfg.ThriftCompactUDP = nil } return nil } // checkPortFromEndpoint checks that the endpoint string contains a port in the format "address:port". If the // port number cannot be parsed, returns an error. func checkPortFromEndpoint(endpoint string) error { _, portStr, err := net.SplitHostPort(endpoint) if err != nil { return fmt.Errorf("endpoint is not formatted correctly: %w", err) } port, err := strconv.ParseInt(portStr, 10, 0) if err != nil { return fmt.Errorf("endpoint port is not a number: %w", err) } if port < 1 || port > 65535 { return errors.New("port number must be between 1 and 65535") } return nil }