internal/manifests/collector/parser/receiver/receiver.go (100 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package receiver
import (
"errors"
"fmt"
"regexp"
"strconv"
"strings"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/parser"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/naming"
)
// registry holds a record of all known receiver parsers.
var registry = make(map[string]parser.Builder)
// BuilderFor returns a parser builder for the given receiver name.
func BuilderFor(name string) parser.Builder {
builder := registry[receiverType(name)]
if builder == nil {
builder = NewGenericReceiverParser
}
return builder
}
// For returns a new parser for the given receiver name + config.
func For(logger logr.Logger, name string, config map[interface{}]interface{}) (parser.ComponentPortParser, error) {
builder := BuilderFor(name)
return builder(logger, name, config), nil
}
// Register adds a new parser builder to the list of known builders.
func Register(name string, builder parser.Builder) {
registry[name] = builder
}
// IsRegistered checks whether a parser is registered with the given name.
func IsRegistered(name string) bool {
_, ok := registry[name]
return ok
}
var (
endpointKey = "endpoint"
listenAddressKey = "listen_address"
scraperReceivers = map[string]struct{}{
"prometheus": {},
}
)
func isScraperReceiver(name string) bool {
_, exists := scraperReceivers[name]
return exists
}
func singlePortFromConfigEndpoint(logger logr.Logger, name string, config map[interface{}]interface{}) *v1.ServicePort {
var endpoint interface{}
switch {
// tcplog and udplog receivers hold the endpoint
// value in `listen_address` field
case name == "tcplog" || name == "udplog":
endpoint = getAddressFromConfig(logger, name, listenAddressKey, config)
// ignore the receiver as it holds the field key endpoint, and it
// is a scraper, we only expose endpoint through k8s service objects for
// receivers that aren't scrapers.
case isScraperReceiver(name):
return nil
default:
endpoint = getAddressFromConfig(logger, name, endpointKey, config)
}
switch e := endpoint.(type) {
case nil:
break
case string:
port, err := portFromEndpoint(e)
if err != nil {
logger.WithValues(endpointKey, e).Error(err, "couldn't parse the endpoint's port")
return nil
}
return &corev1.ServicePort{
Name: naming.PortName(name, port),
Port: port,
}
default:
logger.WithValues(endpointKey, endpoint).Error(fmt.Errorf("unrecognized type %T", endpoint), "receiver's endpoint isn't a string")
}
return nil
}
func getAddressFromConfig(logger logr.Logger, name, key string, config map[interface{}]interface{}) interface{} {
endpoint, ok := config[key]
if !ok {
logger.V(2).Info("%s receiver doesn't have an %s", name, key)
return nil
}
return endpoint
}
func portFromEndpoint(endpoint string) (int32, error) {
var err error
var port int64
r := regexp.MustCompile(":[0-9]+")
if r.MatchString(endpoint) {
port, err = strconv.ParseInt(strings.Replace(r.FindString(endpoint), ":", "", -1), 10, 32)
if err != nil {
return 0, err
}
}
if port == 0 {
return 0, errors.New("port should not be empty")
}
return int32(port), err
}
func receiverType(name string) string {
// receivers have a name like:
// - myreceiver/custom
// - myreceiver
// we extract the "myreceiver" part and see if we have a parser for the receiver
if strings.Contains(name, "/") {
return name[:strings.Index(name, "/")]
}
return name
}