pkg/instrumentation/sdk.go (463 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package instrumentation import ( "context" "fmt" "sort" "strings" "time" "unsafe" "github.com/go-logr/logr" "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.7.0" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1" "github.com/aws/amazon-cloudwatch-agent-operator/pkg/constants" ) const ( volumeName = "opentelemetry-auto-instrumentation" initContainerName = "opentelemetry-auto-instrumentation" sideCarName = "opentelemetry-auto-instrumentation" ) // inject a new sidecar container to the given pod, based on the given AmazonCloudWatchAgent. type sdkInjector struct { client client.Client logger logr.Logger } func (i *sdkInjector) inject(ctx context.Context, insts languageInstrumentations, ns corev1.Namespace, pod corev1.Pod) corev1.Pod { if len(pod.Spec.Containers) < 1 { return pod } if insts.Java.Instrumentation != nil { otelinst := *insts.Java.Instrumentation var err error i.logger.V(1).Info("injecting Java instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) javaContainers := insts.Java.Containers for _, container := range strings.Split(javaContainers, ",") { index := getContainerIndex(container, pod) pod, err = injectJavaagent(otelinst.Spec.Java, pod, index) if err != nil { i.logger.Info("Skipping javaagent injection", "reason", err.Error(), "container", pod.Spec.Containers[index].Name) } else { pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) //disable setting security context in init container due to issue with runAsNonRoot conflict //https://github.com/open-telemetry/opentelemetry-operator/issues/2272 //pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, javaInitContainerName) } } } if insts.NodeJS.Instrumentation != nil { otelinst := *insts.NodeJS.Instrumentation var err error i.logger.V(1).Info("injecting NodeJS instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) nodejsContainers := insts.NodeJS.Containers for _, container := range strings.Split(nodejsContainers, ",") { index := getContainerIndex(container, pod) pod, err = injectNodeJSSDK(otelinst.Spec.NodeJS, pod, index) if err != nil { i.logger.Info("Skipping NodeJS SDK injection", "reason", err.Error(), "container", pod.Spec.Containers[index].Name) } else { pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, nodejsInitContainerName) } } } if insts.Python.Instrumentation != nil { otelinst := *insts.Python.Instrumentation var err error i.logger.V(1).Info("injecting Python instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) pythonContainers := insts.Python.Containers for _, container := range strings.Split(pythonContainers, ",") { index := getContainerIndex(container, pod) pod, err = injectPythonSDK(otelinst.Spec.Python, pod, index) if err != nil { i.logger.Info("Skipping Python SDK injection", "reason", err.Error(), "container", pod.Spec.Containers[index].Name) } else { pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, pythonInitContainerName) } } } if insts.DotNet.Instrumentation != nil { otelinst := *insts.DotNet.Instrumentation var err error i.logger.V(1).Info("injecting DotNet instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) dotnetContainers := insts.DotNet.Containers for _, container := range strings.Split(dotnetContainers, ",") { index := getContainerIndex(container, pod) pod, err = injectDotNetSDK(otelinst.Spec.DotNet, pod, index, insts.DotNet.AdditionalAnnotations[annotationDotNetRuntime]) if err != nil { i.logger.Info("Skipping DotNet SDK injection", "reason", err.Error(), "container", pod.Spec.Containers[index].Name) } else { pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, dotnetInitContainerName) } } } if insts.Go.Instrumentation != nil { origPod := pod otelinst := *insts.Go.Instrumentation var err error i.logger.V(1).Info("injecting Go instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) goContainers := insts.Go.Containers // Go instrumentation supports only single container instrumentation. index := getContainerIndex(goContainers, pod) pod, err = injectGoSDK(otelinst.Spec.Go, pod) if err != nil { i.logger.Info("Skipping Go SDK injection", "reason", err.Error(), "container", pod.Spec.Containers[index].Name) } else { // Common env vars and config need to be applied to the agent contain. pod = i.injectCommonEnvVar(otelinst, pod, len(pod.Spec.Containers)-1) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, len(pod.Spec.Containers)-1, 0) // Ensure that after all the env var coalescing we have a value for OTEL_GO_AUTO_TARGET_EXE idx := getIndexOfEnv(pod.Spec.Containers[len(pod.Spec.Containers)-1].Env, envOtelTargetExe) if idx == -1 { i.logger.Info("Skipping Go SDK injection", "reason", "OTEL_GO_AUTO_TARGET_EXE not set", "container", pod.Spec.Containers[index].Name) pod = origPod } } } if insts.ApacheHttpd.Instrumentation != nil { otelinst := *insts.ApacheHttpd.Instrumentation i.logger.V(1).Info("injecting Apache Httpd instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) apacheHttpdContainers := insts.ApacheHttpd.Containers for _, container := range strings.Split(apacheHttpdContainers, ",") { index := getContainerIndex(container, pod) // Apache agent is configured via config files rather than env vars. // Therefore, service name, otlp endpoint and other attributes are passed to the agent injection method resMap, _ := i.createResourceMap(ctx, otelinst, ns, pod, index) pod = injectApacheHttpdagent(i.logger, otelinst.Spec.ApacheHttpd, pod, index, otelinst.Spec.Endpoint, resMap) pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, apacheAgentInitContainerName) pod = i.setInitContainerSecurityContext(pod, pod.Spec.Containers[index].SecurityContext, apacheAgentCloneContainerName) } } if insts.Nginx.Instrumentation != nil { otelinst := *insts.Nginx.Instrumentation i.logger.V(1).Info("injecting Nginx instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) nginxContainers := insts.Nginx.Containers for _, container := range strings.Split(nginxContainers, ",") { index := getContainerIndex(container, pod) // Nginx agent is configured via config files rather than env vars. // Therefore, service name, otlp endpoint and other attributes are passed to the agent injection method resMap, _ := i.createResourceMap(ctx, otelinst, ns, pod, index) pod = injectNginxSDK(i.logger, otelinst.Spec.Nginx, pod, index, otelinst.Spec.Endpoint, resMap) pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) } } if insts.Sdk.Instrumentation != nil { otelinst := *insts.Sdk.Instrumentation i.logger.V(1).Info("injecting sdk-only instrumentation into pod", "otelinst-namespace", otelinst.Namespace, "otelinst-name", otelinst.Name) sdkContainers := insts.Sdk.Containers for _, container := range strings.Split(sdkContainers, ",") { index := getContainerIndex(container, pod) pod = i.injectCommonEnvVar(otelinst, pod, index) pod = i.injectCommonSDKConfig(ctx, otelinst, ns, pod, index, index) } } return pod } func (i *sdkInjector) setInitContainerSecurityContext(pod corev1.Pod, securityContext *corev1.SecurityContext, instrInitContainerName string) corev1.Pod { for i, initContainer := range pod.Spec.InitContainers { if initContainer.Name == instrInitContainerName { pod.Spec.InitContainers[i].SecurityContext = securityContext } } return pod } func getContainerIndex(containerName string, pod corev1.Pod) int { // We search for specific container to inject variables and if no one is found // We fallback to first container var index = 0 for idx, ctnair := range pod.Spec.Containers { if ctnair.Name == containerName { index = idx } } return index } func (i *sdkInjector) injectCommonEnvVar(otelinst v1alpha1.Instrumentation, pod corev1.Pod, index int) corev1.Pod { container := &pod.Spec.Containers[index] for _, env := range otelinst.Spec.Env { idx := getIndexOfEnv(container.Env, env.Name) if idx == -1 { container.Env = append(container.Env, env) } } return pod } // injectCommonSDKConfig adds common SDK configuration environment variables to the necessary pod // agentIndex represents the index of the pod the needs the env vars to instrument the application. // appIndex represents the index of the pod the will produce the telemetry. // When the pod handling the instrumentation is the same as the pod producing the telemetry agentIndex // and appIndex should be the same value. This is true for dotnet, java, nodejs, and python instrumentations. // Go requires the agent to be a different container in the pod, so the agentIndex should represent this new sidecar // and appIndex should represent the application being instrumented. func (i *sdkInjector) injectCommonSDKConfig(ctx context.Context, otelinst v1alpha1.Instrumentation, ns corev1.Namespace, pod corev1.Pod, agentIndex int, appIndex int) corev1.Pod { container := &pod.Spec.Containers[agentIndex] resourceMap, existingRes := i.createResourceMap(ctx, otelinst, ns, pod, appIndex) idx := getIndexOfEnv(container.Env, constants.EnvOTELServiceName) serviceNameSource := constants.SourceInstrumentation if idx == -1 { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELServiceName, Value: chooseServiceName(pod, resourceMap, appIndex), }) serviceNameSource = constants.SourceK8sWorkload } if otelinst.Spec.Exporter.Endpoint != "" { idx = getIndexOfEnv(container.Env, constants.EnvOTELExporterOTLPEndpoint) if idx == -1 { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELExporterOTLPEndpoint, Value: otelinst.Spec.Endpoint, }) } } // Some attributes might be empty, we should get them via k8s downward API if !existingRes[string(semconv.K8SPodNameKey)] && resourceMap[string(semconv.K8SPodNameKey)] == "" { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvPodName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.name", }, }, }) resourceMap[string(semconv.K8SPodNameKey)] = fmt.Sprintf("$(%s)", constants.EnvPodName) } if otelinst.Spec.Resource.AddK8sUIDAttributes { if resourceMap[string(semconv.K8SPodUIDKey)] == "" { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvPodUID, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "metadata.uid", }, }, }) resourceMap[string(semconv.K8SPodUIDKey)] = fmt.Sprintf("$(%s)", constants.EnvPodUID) } } idx = getIndexOfEnv(container.Env, constants.EnvOTELResourceAttrs) if idx == -1 || !strings.Contains(container.Env[idx].Value, string(semconv.ServiceVersionKey)) { vsn := chooseServiceVersion(pod, appIndex) if vsn != "" { resourceMap[string(semconv.ServiceVersionKey)] = vsn } } if !existingRes[string(semconv.K8SNodeNameKey)] && resourceMap[string(semconv.K8SNodeNameKey)] == "" { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvNodeName, ValueFrom: &corev1.EnvVarSource{ FieldRef: &corev1.ObjectFieldSelector{ FieldPath: "spec.nodeName", }, }, }) resourceMap[string(semconv.K8SNodeNameKey)] = fmt.Sprintf("$(%s)", constants.EnvNodeName) } if !existingRes[constants.ServiceNameSource] && resourceMap[constants.ServiceNameSource] == "" { resourceMap[constants.ServiceNameSource] = serviceNameSource } idx = getIndexOfEnv(container.Env, constants.EnvOTELResourceAttrs) resStr := resourceMapToStr(resourceMap) if idx == -1 { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELResourceAttrs, Value: resStr, }) } else { if !strings.HasSuffix(container.Env[idx].Value, ",") && resStr != "" { resStr = "," + resStr } container.Env[idx].Value += resStr } idx = getIndexOfEnv(container.Env, constants.EnvOTELPropagators) if idx == -1 && len(otelinst.Spec.Propagators) > 0 { propagators := *(*[]string)((unsafe.Pointer(&otelinst.Spec.Propagators))) container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELPropagators, Value: strings.Join(propagators, ","), }) } idx = getIndexOfEnv(container.Env, constants.EnvOTELTracesSampler) // configure sampler only if it is configured in the CR if idx == -1 && otelinst.Spec.Sampler.Type != "" { idxSamplerArg := getIndexOfEnv(container.Env, constants.EnvOTELTracesSamplerArg) if idxSamplerArg == -1 { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELTracesSampler, Value: string(otelinst.Spec.Sampler.Type), }) if otelinst.Spec.Sampler.Argument != "" { container.Env = append(container.Env, corev1.EnvVar{ Name: constants.EnvOTELTracesSamplerArg, Value: otelinst.Spec.Sampler.Argument, }) } } } // Move OTEL_RESOURCE_ATTRIBUTES to last position on env list. // When OTEL_RESOURCE_ATTRIBUTES environment variable uses other env vars // as attributes value they have to be configured before. // It is mandatory to set right order to avoid attributes with value // pointing to the name of used environment variable instead of its value. idx = getIndexOfEnv(container.Env, constants.EnvOTELResourceAttrs) envs := moveEnvToListEnd(container.Env, idx) container.Env = envs return pod } func chooseServiceName(pod corev1.Pod, resources map[string]string, index int) string { if name := resources[string(semconv.K8SDeploymentNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SReplicaSetNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SStatefulSetNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SDaemonSetNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SCronJobNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SJobNameKey)]; name != "" { return name } if name := resources[string(semconv.K8SPodNameKey)]; name != "" { return name } return pod.Spec.Containers[index].Name } // obtains version by splitting image string on ":" and extracting final element from resulting array. func chooseServiceVersion(pod corev1.Pod, index int) string { parts := strings.Split(pod.Spec.Containers[index].Image, ":") tag := parts[len(parts)-1] //guard statement to handle case where image name has a port number if strings.Contains(tag, "/") { return "" } return tag } // creates the service.instance.id following the semantic defined by // https://github.com/open-telemetry/semantic-conventions/pull/312. func createServiceInstanceId(namespaceName, podName, containerName string) string { var serviceInstanceId string if namespaceName != "" && podName != "" && containerName != "" { resNames := []string{namespaceName, podName, containerName} serviceInstanceId = strings.Join(resNames, ".") } return serviceInstanceId } // createResourceMap creates resource attribute map. // User defined attributes (in explicitly set env var) have higher precedence. func (i *sdkInjector) createResourceMap(ctx context.Context, otelinst v1alpha1.Instrumentation, ns corev1.Namespace, pod corev1.Pod, index int) (map[string]string, map[string]bool) { // get existing resources env var and parse it into a map existingRes := map[string]bool{} existingResourceEnvIdx := getIndexOfEnv(pod.Spec.Containers[index].Env, constants.EnvOTELResourceAttrs) if existingResourceEnvIdx > -1 { existingResArr := strings.Split(pod.Spec.Containers[index].Env[existingResourceEnvIdx].Value, ",") for _, kv := range existingResArr { keyValueArr := strings.Split(strings.TrimSpace(kv), "=") if len(keyValueArr) != 2 { continue } existingRes[keyValueArr[0]] = true } } res := map[string]string{} for k, v := range otelinst.Spec.Resource.Attributes { if !existingRes[k] { res[k] = v } } k8sResources := map[attribute.Key]string{} k8sResources[semconv.K8SNamespaceNameKey] = ns.Name k8sResources[semconv.K8SContainerNameKey] = pod.Spec.Containers[index].Name // Some fields might be empty - node name, pod name // The pod name might be empty if the pod is created form deployment template k8sResources[semconv.K8SPodNameKey] = pod.Name k8sResources[semconv.K8SPodUIDKey] = string(pod.UID) k8sResources[semconv.K8SNodeNameKey] = pod.Spec.NodeName k8sResources[semconv.ServiceInstanceIDKey] = createServiceInstanceId(ns.Name, pod.Name, pod.Spec.Containers[index].Name) i.addParentResourceLabels(ctx, otelinst.Spec.Resource.AddK8sUIDAttributes, ns, pod.ObjectMeta, k8sResources) for k, v := range k8sResources { if !existingRes[string(k)] && v != "" { res[string(k)] = v } } return res, existingRes } func (i *sdkInjector) addParentResourceLabels(ctx context.Context, uid bool, ns corev1.Namespace, objectMeta metav1.ObjectMeta, resources map[attribute.Key]string) { for _, owner := range objectMeta.OwnerReferences { switch strings.ToLower(owner.Kind) { case "replicaset": resources[semconv.K8SReplicaSetNameKey] = owner.Name if uid { resources[semconv.K8SReplicaSetUIDKey] = string(owner.UID) } // parent of ReplicaSet is e.g. Deployment which we are interested to know rs := appsv1.ReplicaSet{} nsn := types.NamespacedName{Namespace: ns.Name, Name: owner.Name} backOff := wait.Backoff{Duration: 10 * time.Millisecond, Factor: 1.5, Jitter: 0.1, Steps: 20, Cap: 2 * time.Second} checkError := func(err error) bool { return apierrors.IsNotFound(err) } getReplicaSet := func() error { return i.client.Get(ctx, nsn, &rs) } // use a retry loop to get the Deployment. A single call to client.get fails occasionally err := retry.OnError(backOff, checkError, getReplicaSet) if err != nil { i.logger.Error(err, "failed to get replicaset", "replicaset", nsn.Name, "namespace", nsn.Namespace) } i.addParentResourceLabels(ctx, uid, ns, rs.ObjectMeta, resources) case "deployment": resources[semconv.K8SDeploymentNameKey] = owner.Name if uid { resources[semconv.K8SDeploymentUIDKey] = string(owner.UID) } case "statefulset": resources[semconv.K8SStatefulSetNameKey] = owner.Name if uid { resources[semconv.K8SStatefulSetUIDKey] = string(owner.UID) } case "daemonset": resources[semconv.K8SDaemonSetNameKey] = owner.Name if uid { resources[semconv.K8SDaemonSetUIDKey] = string(owner.UID) } case "job": resources[semconv.K8SJobNameKey] = owner.Name if uid { resources[semconv.K8SJobUIDKey] = string(owner.UID) } case "cronjob": resources[semconv.K8SCronJobNameKey] = owner.Name if uid { resources[semconv.K8SCronJobUIDKey] = string(owner.UID) } } } } func resourceMapToStr(res map[string]string) string { keys := make([]string, 0, len(res)) for k := range res { keys = append(keys, k) } sort.Strings(keys) var str = "" for _, k := range keys { if str != "" { str += "," } str += fmt.Sprintf("%s=%s", k, res[k]) } return str } func getIndexOfEnv(envs []corev1.EnvVar, name string) int { for i := range envs { if envs[i].Name == name { return i } } return -1 } func moveEnvToListEnd(envs []corev1.EnvVar, idx int) []corev1.EnvVar { if idx >= 0 && idx < len(envs) { envToMove := envs[idx] envs = append(envs[:idx], envs[idx+1:]...) envs = append(envs, envToMove) } return envs } func validateContainerEnv(envs []corev1.EnvVar, envsToBeValidated ...string) error { for _, envToBeValidated := range envsToBeValidated { for _, containerEnv := range envs { if containerEnv.Name == envToBeValidated { if containerEnv.ValueFrom != nil { return fmt.Errorf("the container defines env var value via ValueFrom, envVar: %s", containerEnv.Name) } break } } } return nil }