internal/manifests/collector/ports.go (274 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package collector
import (
"errors"
"fmt"
"regexp"
"sort"
"strconv"
"strings"
"github.com/go-logr/logr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/validation"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/naming"
)
const (
StatsD = "statsd"
CollectD = "collectd"
XrayProxy = "aws-proxy"
XrayTraces = "aws-traces"
OtlpGrpc = "otlp-grpc"
OtlpHttp = "otlp-http"
AppSignalsGrpc = CWA + "appsig-grpc"
AppSignalsHttp = CWA + "appsig-http"
AppSignalsProxy = CWA + "appsig-xray"
EMF = "emf"
EMFTcp = "emf-tcp"
EMFUdp = "emf-udp"
CWA = "cwa-"
JmxHttp = "jmx-http"
Server = CWA + "server"
)
var receiverDefaultPortsMap = map[string]int32{
StatsD: 8125,
CollectD: 25826,
XrayProxy: 2000,
XrayTraces: 2000,
OtlpGrpc: 4317,
OtlpHttp: 4318,
AppSignalsGrpc: 4315,
AppSignalsHttp: 4316,
AppSignalsProxy: 2000,
EMF: 25888,
JmxHttp: 4314,
Server: 4311,
}
func PortMapToServicePortList(portMap map[int32][]corev1.ServicePort) []corev1.ServicePort {
ports := make([]corev1.ServicePort, 0, len(portMap))
for _, plist := range portMap {
ports = append(ports, plist...)
}
sort.Slice(ports, func(i, j int) bool {
return ports[i].Name < ports[j].Name
})
return ports
}
func getContainerPorts(logger logr.Logger, cfg string, otelCfg string, specPorts []corev1.ServicePort) map[string]corev1.ContainerPort {
ports := map[string]corev1.ContainerPort{}
var servicePorts []corev1.ServicePort
config, err := adapters.ConfigStructFromJSONString(cfg)
if err != nil {
logger.Error(err, "error parsing cw agent config")
return ports
}
servicePorts = getServicePortsFromCWAgentConfig(logger, config)
if otelCfg != "" {
otelConfig, err := adapters.ConfigFromString(otelCfg)
if err != nil {
logger.Error(err, "error parsing cw agent otel config")
} else {
otelPorts, otelPortsErr := adapters.GetServicePortsFromCWAgentOtelConfig(logger, otelConfig)
if otelPortsErr != nil {
logger.Error(otelPortsErr, "error parsing ports from cw agent otel config")
}
servicePorts = append(servicePorts, otelPorts...)
}
}
for _, p := range servicePorts {
truncName := naming.Truncate(p.Name, maxPortLen)
if p.Name != truncName {
logger.Info("truncating container port name",
zap.String("port.name.prev", p.Name), zap.String("port.name.new", truncName))
}
nameErrs := validation.IsValidPortName(truncName)
numErrs := validation.IsValidPortNum(int(p.Port))
if len(nameErrs) > 0 || len(numErrs) > 0 {
logger.Info("dropping invalid container port", zap.String("port.name", truncName), zap.Int32("port.num", p.Port),
zap.Strings("port.name.errs", nameErrs), zap.Strings("num.errs", numErrs))
continue
}
// remove duplicate ports
if isDuplicatePort(ports, p) {
logger.Info("dropping duplicate container port", zap.String("port.name", truncName), zap.Int32("port.num", p.Port))
continue
}
ports[truncName] = corev1.ContainerPort{
Name: truncName,
ContainerPort: p.Port,
Protocol: p.Protocol,
}
}
for _, p := range specPorts {
ports[p.Name] = corev1.ContainerPort{
Name: p.Name,
ContainerPort: p.Port,
Protocol: p.Protocol,
}
}
return ports
}
func getServicePortsFromCWAgentConfig(logger logr.Logger, config *adapters.CwaConfig) []corev1.ServicePort {
servicePortsMap := make(map[int32][]corev1.ServicePort)
getApplicationSignalsReceiversServicePorts(logger, config, servicePortsMap)
getMetricsReceiversServicePorts(logger, config, servicePortsMap)
getLogsReceiversServicePorts(logger, config, servicePortsMap)
getTracesReceiversServicePorts(logger, config, servicePortsMap)
return PortMapToServicePortList(servicePortsMap)
}
func isAppSignalEnabledMetrics(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsMetricsConfig() != nil
}
func isAppSignalEnabledTraces(config *adapters.CwaConfig) bool {
return config.GetApplicationSignalsTracesConfig() != nil
}
func getMetricsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if config.Metrics == nil || config.Metrics.MetricsCollected == nil {
return
}
//StatD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-statsd.html
if config.Metrics.MetricsCollected.StatsD != nil {
getReceiverServicePort(logger, config.Metrics.MetricsCollected.StatsD.ServiceAddress, StatsD, corev1.ProtocolUDP, servicePortsMap)
}
//CollectD - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-custom-metrics-collectd.html
if config.Metrics.MetricsCollected.CollectD != nil {
getReceiverServicePort(logger, config.Metrics.MetricsCollected.CollectD.ServiceAddress, CollectD, corev1.ProtocolUDP, servicePortsMap)
}
//OTLP
if config.Metrics.MetricsCollected.OTLP != nil {
//GRPC
getReceiverServicePort(logger, config.Metrics.MetricsCollected.OTLP.GRPCEndpoint, OtlpGrpc, corev1.ProtocolTCP, servicePortsMap)
//HTTP
getReceiverServicePort(logger, config.Metrics.MetricsCollected.OTLP.HTTPEndpoint, OtlpHttp, corev1.ProtocolTCP, servicePortsMap)
}
if config.Metrics.MetricsCollected.JMX != nil {
getReceiverServicePort(logger, "", JmxHttp, corev1.ProtocolTCP, servicePortsMap)
}
}
func getReceiverServicePort(logger logr.Logger, serviceAddress string, receiverName string, protocol corev1.Protocol, servicePortsMap map[int32][]corev1.ServicePort) {
if serviceAddress != "" {
port, err := portFromEndpoint(serviceAddress)
if err != nil {
logger.Error(err, "error parsing port from endpoint for receiver", zap.String("endpoint", serviceAddress), zap.String("receiver", receiverName))
} else {
if ports, exists := servicePortsMap[port]; exists {
for _, existingPort := range ports {
if existingPort.Protocol == protocol {
logger.Info("Duplicate port and protocol combination configured", zap.Int32("port", port), zap.String("protocol", string(protocol)))
return
}
}
}
name := CWA + receiverName
if receiverName == OtlpGrpc || receiverName == OtlpHttp {
name = fmt.Sprintf("%s-%d", receiverName, port)
}
sp := corev1.ServicePort{
Name: name,
Port: port,
Protocol: protocol,
}
servicePortsMap[port] = append(servicePortsMap[port], sp)
}
} else {
defaultPort := receiverDefaultPortsMap[receiverName]
if ports, exists := servicePortsMap[defaultPort]; exists {
for _, existingPort := range ports {
if existingPort.Protocol == protocol {
logger.Info("Duplicate port and protocol combination configured", zap.Int32("port", defaultPort), zap.String("protocol", string(protocol)))
return
}
}
}
sp := corev1.ServicePort{
Name: receiverName,
Port: defaultPort,
Protocol: protocol,
}
servicePortsMap[defaultPort] = append(servicePortsMap[defaultPort], sp)
}
}
func getLogsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if config.Logs == nil || config.Logs.LogMetricsCollected == nil {
return
}
//EMF - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format_Generation_CloudWatch_Agent.html
if config.Logs.LogMetricsCollected.EMF != nil {
if _, ok := servicePortsMap[receiverDefaultPortsMap[EMF]]; ok {
logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[EMF]))
} else {
tcp := corev1.ServicePort{
Name: EMFTcp,
Port: receiverDefaultPortsMap[EMF],
Protocol: corev1.ProtocolTCP,
}
udp := corev1.ServicePort{
Name: EMFUdp,
Port: receiverDefaultPortsMap[EMF],
Protocol: corev1.ProtocolUDP,
}
servicePortsMap[receiverDefaultPortsMap[EMF]] = []corev1.ServicePort{tcp, udp}
}
}
//OTLP
if config.Logs.LogMetricsCollected.OTLP != nil {
//GRPC
getReceiverServicePort(logger, config.Logs.LogMetricsCollected.OTLP.GRPCEndpoint, OtlpGrpc, corev1.ProtocolTCP, servicePortsMap)
//HTTP
getReceiverServicePort(logger, config.Logs.LogMetricsCollected.OTLP.HTTPEndpoint, OtlpHttp, corev1.ProtocolTCP, servicePortsMap)
}
//JMX Container Insights
if config.Logs.LogMetricsCollected.Kubernetes != nil && config.Logs.LogMetricsCollected.Kubernetes.JMXContainerInsights {
if _, ok := servicePortsMap[receiverDefaultPortsMap[JmxHttp]]; ok {
logger.Info("Duplicate port has been configured in Agent Config for port", zap.Int32("port", receiverDefaultPortsMap[JmxHttp]))
} else {
tcp := corev1.ServicePort{
Name: JmxHttp,
Port: receiverDefaultPortsMap[JmxHttp],
Protocol: corev1.ProtocolTCP,
}
servicePortsMap[receiverDefaultPortsMap[JmxHttp]] = []corev1.ServicePort{tcp}
}
}
}
func getTracesReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) []corev1.ServicePort {
var tracesPorts []corev1.ServicePort
if config.Traces == nil || config.Traces.TracesCollected == nil {
return tracesPorts
}
//Traces - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-Agent-Configuration-File-Details.html#CloudWatch-Agent-Configuration-File-Tracessection
//OTLP
if config.Traces.TracesCollected.OTLP != nil {
//GRPC
getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.GRPCEndpoint, OtlpGrpc, corev1.ProtocolTCP, servicePortsMap)
//HTTP
getReceiverServicePort(logger, config.Traces.TracesCollected.OTLP.HTTPEndpoint, OtlpHttp, corev1.ProtocolTCP, servicePortsMap)
}
//Xray
if config.Traces.TracesCollected.XRay != nil {
getReceiverServicePort(logger, config.Traces.TracesCollected.XRay.BindAddress, XrayTraces, corev1.ProtocolUDP, servicePortsMap)
serviceAddress := ""
if config.Traces.TracesCollected.XRay.TCPProxy != nil {
serviceAddress = config.Traces.TracesCollected.XRay.TCPProxy.BindAddress
}
getReceiverServicePort(logger, serviceAddress, XrayProxy, corev1.ProtocolTCP, servicePortsMap)
}
return tracesPorts
}
func getApplicationSignalsReceiversServicePorts(logger logr.Logger, config *adapters.CwaConfig, servicePortsMap map[int32][]corev1.ServicePort) {
if isAppSignalEnabledMetrics(config) || isAppSignalEnabledTraces(config) {
getReceiverServicePort(logger, "", AppSignalsGrpc, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, "", AppSignalsHttp, corev1.ProtocolTCP, servicePortsMap)
getReceiverServicePort(logger, "", Server, corev1.ProtocolTCP, servicePortsMap)
}
if isAppSignalEnabledTraces(config) {
getReceiverServicePort(logger, "", AppSignalsProxy, corev1.ProtocolTCP, servicePortsMap)
}
}
func portFromEndpoint(endpoint string) (int32, error) {
var err error
var port int64
r := regexp.MustCompile(":[0-9]+")
if r.MatchString(endpoint) {
port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32)
if err != nil {
return 0, err
}
}
if port == 0 {
return 0, errors.New("port should not be empty")
}
return int32(port), err
}
func isDuplicatePort(portsMap map[string]corev1.ContainerPort, servicePort corev1.ServicePort) bool {
for _, containerPort := range portsMap {
if containerPort.Protocol == servicePort.Protocol && containerPort.ContainerPort == servicePort.Port {
return true
}
}
return false
}