internal/manifests/collector/parser/receiver/receiver_otlp.go (95 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package receiver import ( "fmt" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" "github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/parser" "github.com/aws/amazon-cloudwatch-agent-operator/internal/naming" ) var _ parser.ComponentPortParser = &OTLPReceiverParser{} const ( parserNameOTLP = "__otlp" defaultOTLPGRPCPort int32 = 4317 defaultOTLPHTTPPort int32 = 4318 ) var ( grpc = "grpc" http = "http" ) // OTLPReceiverParser parses the configuration for OTLP receivers. type OTLPReceiverParser struct { config map[interface{}]interface{} logger logr.Logger name string } // NewOTLPReceiverParser builds a new parser for OTLP receivers. func NewOTLPReceiverParser(logger logr.Logger, name string, config map[interface{}]interface{}) parser.ComponentPortParser { if protocols, ok := config["protocols"].(map[interface{}]interface{}); ok { return &OTLPReceiverParser{ logger: logger, name: name, config: protocols, } } return &OTLPReceiverParser{ name: name, config: map[interface{}]interface{}{}, } } // Ports returns all the service ports for all protocols in this parser. func (o *OTLPReceiverParser) Ports() ([]corev1.ServicePort, error) { ports := []corev1.ServicePort{} for _, protocol := range []struct { name string defaultPorts []corev1.ServicePort }{ { name: grpc, defaultPorts: []corev1.ServicePort{ { Name: naming.PortName(fmt.Sprintf("%s-grpc", o.name), defaultOTLPGRPCPort), Port: defaultOTLPGRPCPort, TargetPort: intstr.FromInt(int(defaultOTLPGRPCPort)), AppProtocol: &grpc, }, }, }, { name: http, defaultPorts: []corev1.ServicePort{ { Name: naming.PortName(fmt.Sprintf("%s-http", o.name), defaultOTLPHTTPPort), Port: defaultOTLPHTTPPort, TargetPort: intstr.FromInt(int(defaultOTLPHTTPPort)), AppProtocol: &http, }, }, }, } { // do we have the protocol specified at all? if receiverProtocol, ok := o.config[protocol.name]; ok { // we have the specified protocol, we definitely need a service port nameWithProtocol := fmt.Sprintf("%s-%s", o.name, protocol.name) var protocolPort *corev1.ServicePort // do we have a configuration block for the protocol? settings, ok := receiverProtocol.(map[interface{}]interface{}) if ok { protocolPort = singlePortFromConfigEndpoint(o.logger, nameWithProtocol, settings) } // have we parsed a port based on the configuration block? // if not, we use the default port if protocolPort == nil { ports = append(ports, protocol.defaultPorts...) } else { // infer protocol and appProtocol from protocol.name if protocol.name == grpc { protocolPort.Protocol = corev1.ProtocolTCP protocolPort.AppProtocol = &grpc } else if protocol.name == http { protocolPort.Protocol = corev1.ProtocolTCP protocolPort.AppProtocol = &http } ports = append(ports, *protocolPort) } } } return ports, nil } // ParserName returns the name of this parser. func (o *OTLPReceiverParser) ParserName() string { return parserNameOTLP } func init() { Register("otlp", NewOTLPReceiverParser) }