internal/manifests/collector/service.go (127 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package collector
import (
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/manifestutils"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/naming"
)
// headless label is to differentiate the headless service from the clusterIP service.
const (
headlessLabel = "operator.opentelemetry.io/collector-headless-service"
headlessExists = "Exists"
)
func HeadlessService(params manifests.Params) (*corev1.Service, error) {
h, err := Service(params)
if h == nil || err != nil {
return h, err
}
h.Name = naming.HeadlessService(params.OtelCol.Name)
h.Labels[headlessLabel] = headlessExists
// copy to avoid modifying params.OtelCol.Annotations
annotations := map[string]string{
"service.beta.openshift.io/serving-cert-secret-name": fmt.Sprintf("%s-tls", h.Name),
}
for k, v := range h.Annotations {
annotations[k] = v
}
h.Annotations = annotations
h.Spec.ClusterIP = "None"
return h, nil
}
func MonitoringService(params manifests.Params) (*corev1.Service, error) {
name := naming.MonitoringService(params.OtelCol.Name)
labels := manifestutils.Labels(params.OtelCol.ObjectMeta, name, params.OtelCol.Spec.Image, ComponentAmazonCloudWatchAgent, []string{})
c, err := adapters.ConfigFromString(params.OtelCol.Spec.Config)
if err != nil {
params.Log.Error(err, "couldn't extract the configuration")
return nil, err
}
metricsPort, err := adapters.ConfigToMetricsPort(params.Log, c)
if err != nil {
return nil, err
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: params.OtelCol.Namespace,
Labels: labels,
Annotations: params.OtelCol.Annotations,
},
Spec: corev1.ServiceSpec{
Selector: manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, ComponentAmazonCloudWatchAgent),
ClusterIP: "",
Ports: []corev1.ServicePort{{
Name: "monitoring",
Port: metricsPort,
}},
},
}, nil
}
func Service(params manifests.Params) (*corev1.Service, error) {
name := naming.Service(params.OtelCol.Name)
labels := manifestutils.Labels(params.OtelCol.ObjectMeta, name, params.OtelCol.Spec.Image, ComponentAmazonCloudWatchAgent, []string{})
ports := getContainerPorts(params.Log, params.OtelCol.Spec.Config, params.OtelCol.Spec.OtelConfig, params.OtelCol.Spec.Ports)
// if we have no ports, we don't need a service
if len(ports) == 0 {
params.Log.V(1).Info("the instance's configuration didn't yield any ports to open, skipping service", "instance.name", params.OtelCol.Name, "instance.namespace", params.OtelCol.Namespace)
return nil, nil
}
trafficPolicy := corev1.ServiceInternalTrafficPolicyCluster
if params.OtelCol.Spec.Mode == v1alpha1.ModeDaemonSet {
trafficPolicy = corev1.ServiceInternalTrafficPolicyLocal
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: naming.Service(params.OtelCol.Name),
Namespace: params.OtelCol.Namespace,
Labels: labels,
Annotations: params.OtelCol.Annotations,
},
Spec: corev1.ServiceSpec{
InternalTrafficPolicy: &trafficPolicy,
Selector: manifestutils.SelectorLabels(params.OtelCol.ObjectMeta, ComponentAmazonCloudWatchAgent),
ClusterIP: "",
Ports: containerPortsToServicePortList(ports),
},
}, nil
}
func containerPortsToServicePortList(portMap map[string]corev1.ContainerPort) []corev1.ServicePort {
var ports []corev1.ServicePort
for _, p := range portMap {
ports = append(ports, corev1.ServicePort{
Name: p.Name,
Port: p.ContainerPort,
Protocol: p.Protocol,
})
}
return ports
}
func filterPort(logger logr.Logger, candidate corev1.ServicePort, portNumbers map[int32]bool, portNames map[string]bool) *corev1.ServicePort {
if portNumbers[candidate.Port] {
return nil
}
// do we have the port name there already?
if portNames[candidate.Name] {
// there's already a port with the same name! do we have a 'port-%d' already?
fallbackName := fmt.Sprintf("port-%d", candidate.Port)
if portNames[fallbackName] {
// that wasn't expected, better skip this port
logger.V(2).Info("a port name specified in the CR clashes with an inferred port name, and the fallback port name clashes with another port name! Skipping this port.",
"inferred-port-name", candidate.Name,
"fallback-port-name", fallbackName,
)
return nil
}
candidate.Name = fallbackName
return &candidate
}
// this port is unique, return as is
return &candidate
}
func extractPortNumbersAndNames(ports []corev1.ServicePort) (map[int32]bool, map[string]bool) {
numbers := map[int32]bool{}
names := map[string]bool{}
for _, port := range ports {
numbers[port.Port] = true
names[port.Name] = true
}
return numbers, names
}