pkg/instrumentation/podmutator.go (365 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package instrumentation
import (
"context"
"errors"
"fmt"
"strings"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/manifests/collector/adapters"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/webhook/podmutation"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/featuregate"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation/jmx"
)
const (
amazonCloudWatchNamespace = "amazon-cloudwatch"
amazonCloudWatchAgentName = "cloudwatch-agent"
)
var (
errMultipleInstancesPossible = errors.New("multiple OpenTelemetry Instrumentation instances available, cannot determine which one to select")
)
type instPodMutator struct {
Client client.Client
sdkInjector *sdkInjector
Logger logr.Logger
Recorder record.EventRecorder
}
type instrumentationWithContainers struct {
Instrumentation *v1alpha1.Instrumentation
Containers string
AdditionalAnnotations map[string]string
}
type languageInstrumentations struct {
Java instrumentationWithContainers
NodeJS instrumentationWithContainers
Python instrumentationWithContainers
DotNet instrumentationWithContainers
ApacheHttpd instrumentationWithContainers
Nginx instrumentationWithContainers
Go instrumentationWithContainers
Sdk instrumentationWithContainers
}
// Check if single instrumentation is configured for Pod and return which is configured.
func (langInsts languageInstrumentations) isSingleInstrumentationEnabled() bool {
count := 0
if langInsts.Java.Instrumentation != nil {
count++
}
if langInsts.NodeJS.Instrumentation != nil {
count++
}
if langInsts.Python.Instrumentation != nil {
count++
}
if langInsts.DotNet.Instrumentation != nil {
count++
}
if langInsts.ApacheHttpd.Instrumentation != nil {
count++
}
if langInsts.Nginx.Instrumentation != nil {
count++
}
if langInsts.Go.Instrumentation != nil {
count++
}
if langInsts.Sdk.Instrumentation != nil {
count++
}
return count == 1
}
// Check if specific containers are provided for configured instrumentation.
func (langInsts languageInstrumentations) areContainerNamesConfiguredForMultipleInstrumentations() (bool, error) {
var instrWithoutContainers int
var instrWithContainers int
var allContainers []string
if featuregate.SkipMultiInstrumentationContainerValidation.IsEnabled() {
return true, nil
}
// Check for instrumentations with and without containers.
if langInsts.Java.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.Java)
instrWithoutContainers += isInstrWithoutContainers(langInsts.Java)
allContainers = append(allContainers, langInsts.Java.Containers)
}
if langInsts.NodeJS.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.NodeJS)
instrWithoutContainers += isInstrWithoutContainers(langInsts.NodeJS)
allContainers = append(allContainers, langInsts.NodeJS.Containers)
}
if langInsts.Python.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.Python)
instrWithoutContainers += isInstrWithoutContainers(langInsts.Python)
allContainers = append(allContainers, langInsts.Python.Containers)
}
if langInsts.DotNet.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.DotNet)
instrWithoutContainers += isInstrWithoutContainers(langInsts.DotNet)
allContainers = append(allContainers, langInsts.DotNet.Containers)
}
if langInsts.ApacheHttpd.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.ApacheHttpd)
instrWithoutContainers += isInstrWithoutContainers(langInsts.ApacheHttpd)
allContainers = append(allContainers, langInsts.ApacheHttpd.Containers)
}
if langInsts.Nginx.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.Nginx)
instrWithoutContainers += isInstrWithoutContainers(langInsts.Nginx)
allContainers = append(allContainers, langInsts.Nginx.Containers)
}
if langInsts.Go.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.Go)
instrWithoutContainers += isInstrWithoutContainers(langInsts.Go)
allContainers = append(allContainers, langInsts.Go.Containers)
}
if langInsts.Sdk.Instrumentation != nil {
instrWithContainers += isInstrWithContainers(langInsts.Sdk)
instrWithoutContainers += isInstrWithoutContainers(langInsts.Sdk)
allContainers = append(allContainers, langInsts.Sdk.Containers)
}
// Look for duplicated containers.
containerDuplicates := findDuplicatedContainers(allContainers)
if containerDuplicates != nil {
return false, containerDuplicates
}
// Look for mixed multiple instrumentations with and without container names.
if instrWithoutContainers > 0 && instrWithContainers > 0 {
return false, fmt.Errorf("incorrect instrumentation configuration - please provide container names for all instrumentations")
}
// Look for multiple instrumentations without container names.
if instrWithoutContainers > 1 && instrWithContainers == 0 {
return false, fmt.Errorf("incorrect instrumentation configuration - please provide container names for all instrumentations")
}
if instrWithoutContainers == 0 && instrWithContainers == 0 {
return false, fmt.Errorf("instrumentation configuration not provided")
}
return true, nil
}
// Set containers for configured instrumentation.
func (langInsts *languageInstrumentations) setInstrumentationLanguageContainers(containers string) {
if langInsts.Java.Instrumentation != nil {
langInsts.Java.Containers = containers
}
if langInsts.NodeJS.Instrumentation != nil {
langInsts.NodeJS.Containers = containers
}
if langInsts.Python.Instrumentation != nil {
langInsts.Python.Containers = containers
}
if langInsts.DotNet.Instrumentation != nil {
langInsts.DotNet.Containers = containers
}
if langInsts.ApacheHttpd.Instrumentation != nil {
langInsts.ApacheHttpd.Containers = containers
}
if langInsts.Nginx.Instrumentation != nil {
langInsts.Nginx.Containers = containers
}
if langInsts.Go.Instrumentation != nil {
langInsts.Go.Containers = containers
}
if langInsts.Sdk.Instrumentation != nil {
langInsts.Sdk.Containers = containers
}
}
var _ podmutation.PodMutator = (*instPodMutator)(nil)
func NewMutator(logger logr.Logger, client client.Client, recorder record.EventRecorder) *instPodMutator {
return &instPodMutator{
Logger: logger,
Client: client,
sdkInjector: &sdkInjector{
logger: logger,
client: client,
},
Recorder: recorder,
}
}
func (pm *instPodMutator) Mutate(ctx context.Context, ns corev1.Namespace, pod corev1.Pod) (corev1.Pod, error) {
logger := pm.Logger.WithValues("namespace", pod.Namespace, "name", pod.Name)
// We check if Pod is already instrumented.
if isAutoInstrumentationInjected(pod) {
logger.Info("Skipping pod instrumentation - already instrumented")
return pod, nil
}
var inst *v1alpha1.Instrumentation
var err error
insts := languageInstrumentations{}
// We bail out if any annotation fails to process.
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectJava); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableJavaAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.Java.Instrumentation = inst
} else {
logger.Error(nil, "support for Java auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for Java auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectNodeJS); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableNodeJSAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.NodeJS.Instrumentation = inst
} else {
logger.Error(nil, "support for NodeJS auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for NodeJS auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectPython); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnablePythonAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.Python.Instrumentation = inst
} else {
logger.Error(nil, "support for Python auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for Python auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectDotNet); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableDotnetAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.DotNet.Instrumentation = inst
insts.DotNet.AdditionalAnnotations = map[string]string{annotationDotNetRuntime: annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationDotNetRuntime)}
} else {
logger.Error(nil, "support for .NET auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for .NET auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectGo); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableGoAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.Go.Instrumentation = inst
} else {
logger.Error(err, "support for Go auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for Go auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectApacheHttpd); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableApacheHTTPAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.ApacheHttpd.Instrumentation = inst
} else {
logger.Error(nil, "support for Apache HTTPD auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for Apache HTTPD auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectNginx); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
if featuregate.EnableNginxAutoInstrumentationSupport.IsEnabled() || inst == nil {
insts.Nginx.Instrumentation = inst
} else {
logger.Error(nil, "support for Nginx auto instrumentation is not enabled")
pm.Recorder.Event(pod.DeepCopy(), "Warning", "InstrumentationRequestRejected", "support for Nginx auto instrumentation is not enabled")
}
if inst, err = pm.getInstrumentationInstance(ctx, ns, pod, annotationInjectSdk); err != nil {
// we still allow the pod to be created, but we log a message to the operator's logs
logger.Error(err, "failed to select an OpenTelemetry Instrumentation instance for this pod")
return pod, err
}
insts.Sdk.Instrumentation = inst
if insts.Java.Instrumentation == nil && insts.NodeJS.Instrumentation == nil && insts.Python.Instrumentation == nil &&
insts.DotNet.Instrumentation == nil && insts.Go.Instrumentation == nil && insts.ApacheHttpd.Instrumentation == nil &&
insts.Nginx.Instrumentation == nil &&
insts.Sdk.Instrumentation == nil {
logger.V(1).Info("annotation not present in deployment, skipping instrumentation injection")
return pod, nil
}
// We retrieve the annotation for podname
if featuregate.EnableMultiInstrumentationSupport.IsEnabled() {
// We use annotations specific for instrumentation language
insts.Java.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectJavaContainersName)
insts.NodeJS.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectNodeJSContainersName)
insts.Python.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectPythonContainersName)
insts.DotNet.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectDotnetContainersName)
insts.Go.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectGoContainersName)
insts.ApacheHttpd.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectApacheHttpdContainersName)
insts.Nginx.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectNginxContainersName)
insts.Sdk.Containers = annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectSdkContainersName)
// We check if provided annotations and instrumentations are valid
ok, msg := insts.areContainerNamesConfiguredForMultipleInstrumentations()
if !ok {
logger.V(1).Error(msg, "skipping instrumentation injection")
return pod, nil
}
} else {
// We use general annotation for container names
// only when multi instrumentation is disabled
singleInstrEnabled := insts.isSingleInstrumentationEnabled()
if singleInstrEnabled {
generalContainerNames := annotationValue(ns.ObjectMeta, pod.ObjectMeta, annotationInjectContainerName)
insts.setInstrumentationLanguageContainers(generalContainerNames)
} else {
logger.V(1).Error(fmt.Errorf("multiple injection annotations present"), "skipping instrumentation injection")
return pod, nil
}
}
// once it's been determined that instrumentation is desired, none exists yet, and we know which instance it should talk to,
// we should inject the instrumentation.
modifiedPod := pod
modifiedPod = pm.sdkInjector.inject(ctx, insts, ns, modifiedPod)
return modifiedPod, nil
}
func (pm *instPodMutator) getInstrumentationInstance(ctx context.Context, ns corev1.Namespace, pod corev1.Pod, instAnnotation string) (*v1alpha1.Instrumentation, error) {
instValue := annotationValue(ns.ObjectMeta, pod.ObjectMeta, instAnnotation)
if len(instValue) == 0 || strings.EqualFold(instValue, "false") {
return nil, nil
}
var additionalEnvs map[Type]map[string]string
if instAnnotation == annotationInjectJava {
additionalEnvs = map[Type]map[string]string{}
targetSystems := getJmxTargetSystems(ns, pod)
if len(targetSystems) != 0 {
additionalEnvs[TypeJava] = map[string]string{
jmx.EnvTargetSystem: strings.Join(targetSystems, ","),
}
}
}
if strings.EqualFold(instValue, "true") {
return pm.selectInstrumentationInstanceFromNamespace(ctx, ns, additionalEnvs, isWindowsPod(pod))
}
var instNamespacedName types.NamespacedName
if instNamespace, instName, namespaced := strings.Cut(instValue, "/"); namespaced {
instNamespacedName = types.NamespacedName{Name: instName, Namespace: instNamespace}
} else {
instNamespacedName = types.NamespacedName{Name: instValue, Namespace: ns.Name}
}
otelInst := &v1alpha1.Instrumentation{}
err := pm.Client.Get(ctx, instNamespacedName, otelInst)
if err != nil {
return nil, err
}
return otelInst, nil
}
func (pm *instPodMutator) selectInstrumentationInstanceFromNamespace(ctx context.Context, ns corev1.Namespace, additionalEnvs map[Type]map[string]string, isWindowsPod bool) (*v1alpha1.Instrumentation, error) {
var otelInsts v1alpha1.InstrumentationList
if err := pm.Client.List(ctx, &otelInsts, client.InNamespace(ns.Name)); err != nil {
return nil, err
}
switch s := len(otelInsts.Items); {
case s == 0:
pm.Logger.Info("no OpenTelemetry Instrumentation instances available. Using default Instrumentation instance")
cr := GetAmazonCloudWatchAgentResource(ctx, pm.Client, amazonCloudWatchAgentName)
config, err := adapters.ConfigStructFromJSONString(cr.Spec.Config)
if err != nil {
pm.Logger.Error(err, "unable to retrieve cloudwatch agent config for instrumentation")
}
return getDefaultInstrumentation(config, additionalEnvs, isWindowsPod)
case s > 1:
return nil, errMultipleInstancesPossible
default:
return &otelInsts.Items[0], nil
}
}
func GetAmazonCloudWatchAgentResource(ctx context.Context, c client.Client, name string) v1alpha1.AmazonCloudWatchAgent {
cr := &v1alpha1.AmazonCloudWatchAgent{}
_ = c.Get(ctx, client.ObjectKey{
Namespace: amazonCloudWatchNamespace,
Name: name,
}, cr)
return *cr
}
func isWindowsPod(pod corev1.Pod) bool {
return pod.Spec.NodeSelector["kubernetes.io/os"] == "windows"
}
func getJmxTargetSystems(ns corev1.Namespace, pod corev1.Pod) []string {
var targetSystems []string
for _, target := range jmx.SupportedTargets {
value := annotationValue(ns.ObjectMeta, pod.ObjectMeta, jmx.AnnotationKey(target))
if strings.EqualFold(value, "true") {
targetSystems = append(targetSystems, target)
}
}
return targetSystems
}