processor/k8sattributesprocessor/options.go (336 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package k8sattributesprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor"
import (
"fmt"
"os"
"regexp"
"time"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
"k8s.io/apimachinery/pkg/selection"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata"
)
const (
filterOPEquals = "equals"
filterOPNotEquals = "not-equals"
filterOPExists = "exists"
filterOPDoesNotExist = "does-not-exist"
metadataPodIP = "k8s.pod.ip"
metadataPodStartTime = "k8s.pod.start_time"
specPodHostName = "k8s.pod.hostname"
// TODO: use k8s.cluster.uid, container.image.repo_digests
// from semconv when available,
// replace clusterUID with conventions.AttributeK8SClusterUID
// replace containerRepoDigests with conventions.AttributeContainerImageRepoDigests
clusterUID = "k8s.cluster.uid"
containerImageRepoDigests = "container.image.repo_digests"
)
// option represents a configuration option that can be passes.
// to the k8s-tagger
type option func(*kubernetesprocessor) error
// withAPIConfig provides k8s API related configuration to the processor.
// It defaults the authentication method to in-cluster auth using service accounts.
func withAPIConfig(cfg k8sconfig.APIConfig) option {
return func(p *kubernetesprocessor) error {
p.apiConfig = cfg
return p.apiConfig.Validate()
}
}
// withPassthrough enables passthrough mode. In passthrough mode, the processor
// only detects and tags the pod IP and does not invoke any k8s APIs.
func withPassthrough() option {
return func(p *kubernetesprocessor) error {
p.passthroughMode = true
return nil
}
}
// enabledAttributes returns the list of resource attributes enabled by default.
func enabledAttributes() (attributes []string) {
defaultConfig := metadata.DefaultResourceAttributesConfig()
if defaultConfig.K8sClusterUID.Enabled {
attributes = append(attributes, clusterUID)
}
if defaultConfig.ContainerID.Enabled {
attributes = append(attributes, conventions.AttributeContainerID)
}
if defaultConfig.ContainerImageName.Enabled {
attributes = append(attributes, conventions.AttributeContainerImageName)
}
if defaultConfig.ContainerImageRepoDigests.Enabled {
attributes = append(attributes, containerImageRepoDigests)
}
if defaultConfig.ContainerImageTag.Enabled {
attributes = append(attributes, conventions.AttributeContainerImageTag)
}
if defaultConfig.K8sContainerName.Enabled {
attributes = append(attributes, conventions.AttributeK8SContainerName)
}
if defaultConfig.K8sCronjobName.Enabled {
attributes = append(attributes, conventions.AttributeK8SCronJobName)
}
if defaultConfig.K8sDaemonsetName.Enabled {
attributes = append(attributes, conventions.AttributeK8SDaemonSetName)
}
if defaultConfig.K8sDaemonsetUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SDaemonSetUID)
}
if defaultConfig.K8sDeploymentName.Enabled {
attributes = append(attributes, conventions.AttributeK8SDeploymentName)
}
if defaultConfig.K8sDeploymentUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SDeploymentUID)
}
if defaultConfig.K8sJobName.Enabled {
attributes = append(attributes, conventions.AttributeK8SJobName)
}
if defaultConfig.K8sJobUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SJobUID)
}
if defaultConfig.K8sNamespaceName.Enabled {
attributes = append(attributes, conventions.AttributeK8SNamespaceName)
}
if defaultConfig.K8sNodeName.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeName)
}
if defaultConfig.K8sNodeUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SNodeUID)
}
if defaultConfig.K8sPodHostname.Enabled {
attributes = append(attributes, specPodHostName)
}
if defaultConfig.K8sPodName.Enabled {
attributes = append(attributes, conventions.AttributeK8SPodName)
}
if defaultConfig.K8sPodStartTime.Enabled {
attributes = append(attributes, metadataPodStartTime)
}
if defaultConfig.K8sPodUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SPodUID)
}
if defaultConfig.K8sPodIP.Enabled {
attributes = append(attributes, metadataPodIP)
}
if defaultConfig.K8sReplicasetName.Enabled {
attributes = append(attributes, conventions.AttributeK8SReplicaSetName)
}
if defaultConfig.K8sReplicasetUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SReplicaSetUID)
}
if defaultConfig.K8sStatefulsetName.Enabled {
attributes = append(attributes, conventions.AttributeK8SStatefulSetName)
}
if defaultConfig.K8sStatefulsetUID.Enabled {
attributes = append(attributes, conventions.AttributeK8SStatefulSetUID)
}
return
}
// withExtractMetadata allows specifying options to control extraction of pod metadata.
// If no fields explicitly provided, the defaults are pulled from metadata.yaml.
func withExtractMetadata(fields ...string) option {
return func(p *kubernetesprocessor) error {
for _, field := range fields {
switch field {
case conventions.AttributeK8SNamespaceName:
p.rules.Namespace = true
case conventions.AttributeK8SPodName:
p.rules.PodName = true
case conventions.AttributeK8SPodUID:
p.rules.PodUID = true
case specPodHostName:
p.rules.PodHostName = true
case metadataPodStartTime:
p.rules.StartTime = true
case metadataPodIP:
p.rules.PodIP = true
case conventions.AttributeK8SDeploymentName:
p.rules.DeploymentName = true
case conventions.AttributeK8SDeploymentUID:
p.rules.DeploymentUID = true
case conventions.AttributeK8SReplicaSetName:
p.rules.ReplicaSetName = true
case conventions.AttributeK8SReplicaSetUID:
p.rules.ReplicaSetID = true
case conventions.AttributeK8SDaemonSetName:
p.rules.DaemonSetName = true
case conventions.AttributeK8SDaemonSetUID:
p.rules.DaemonSetUID = true
case conventions.AttributeK8SStatefulSetName:
p.rules.StatefulSetName = true
case conventions.AttributeK8SStatefulSetUID:
p.rules.StatefulSetUID = true
case conventions.AttributeK8SContainerName:
p.rules.ContainerName = true
case conventions.AttributeK8SJobName:
p.rules.JobName = true
case conventions.AttributeK8SJobUID:
p.rules.JobUID = true
case conventions.AttributeK8SCronJobName:
p.rules.CronJobName = true
case conventions.AttributeK8SNodeName:
p.rules.Node = true
case conventions.AttributeK8SNodeUID:
p.rules.NodeUID = true
case conventions.AttributeContainerID:
p.rules.ContainerID = true
case conventions.AttributeContainerImageName:
p.rules.ContainerImageName = true
case containerImageRepoDigests:
p.rules.ContainerImageRepoDigests = true
case conventions.AttributeContainerImageTag:
p.rules.ContainerImageTag = true
case clusterUID:
p.rules.ClusterUID = true
}
}
return nil
}
}
// withExtractLabels allows specifying options to control extraction of pod labels.
func withExtractLabels(labels ...FieldExtractConfig) option {
return func(p *kubernetesprocessor) error {
labels, err := extractFieldRules("labels", labels...)
if err != nil {
return err
}
p.rules.Labels = labels
return nil
}
}
// withExtractAnnotations allows specifying options to control extraction of pod annotations tags.
func withExtractAnnotations(annotations ...FieldExtractConfig) option {
return func(p *kubernetesprocessor) error {
annotations, err := extractFieldRules("annotations", annotations...)
if err != nil {
return err
}
p.rules.Annotations = annotations
return nil
}
}
func extractFieldRules(fieldType string, fields ...FieldExtractConfig) ([]kube.FieldExtractionRule, error) {
var rules []kube.FieldExtractionRule
for _, a := range fields {
name := a.TagName
if a.From == "" {
a.From = kube.MetadataFromPod
}
if name == "" && a.Key != "" {
// name for KeyRegex case is set at extraction time/runtime, skipped here
name = fmt.Sprintf("k8s.%v.%v.%v", a.From, fieldType, a.Key)
}
var keyRegex *regexp.Regexp
var hasKeyRegexReference bool
if a.KeyRegex != "" {
var err error
keyRegex, err = regexp.Compile("^(?:" + a.KeyRegex + ")$")
if err != nil {
return rules, err
}
if keyRegex.NumSubexp() > 0 {
hasKeyRegexReference = true
}
}
rules = append(rules, kube.FieldExtractionRule{
Name: name, Key: a.Key, KeyRegex: keyRegex, HasKeyRegexReference: hasKeyRegexReference, From: a.From,
})
}
return rules, nil
}
// withFilterNode allows specifying options to control filtering pods by a node/host.
func withFilterNode(node, nodeFromEnvVar string) option {
return func(p *kubernetesprocessor) error {
if nodeFromEnvVar != "" {
p.filters.Node = os.Getenv(nodeFromEnvVar)
return nil
}
p.filters.Node = node
return nil
}
}
// withFilterNamespace allows specifying options to control filtering pods by a namespace.
func withFilterNamespace(ns string) option {
return func(p *kubernetesprocessor) error {
p.filters.Namespace = ns
return nil
}
}
// withFilterLabels allows specifying options to control filtering pods by pod labels.
func withFilterLabels(filters ...FieldFilterConfig) option {
return func(p *kubernetesprocessor) error {
var labels []kube.LabelFilter
for _, f := range filters {
var op selection.Operator
switch f.Op {
case filterOPNotEquals:
op = selection.NotEquals
case filterOPExists:
op = selection.Exists
case filterOPDoesNotExist:
op = selection.DoesNotExist
default:
op = selection.Equals
}
labels = append(labels, kube.LabelFilter{
Key: f.Key,
Value: f.Value,
Op: op,
})
}
p.filters.Labels = labels
return nil
}
}
// withFilterFields allows specifying options to control filtering pods by pod fields.
func withFilterFields(filters ...FieldFilterConfig) option {
return func(p *kubernetesprocessor) error {
var fields []kube.FieldFilter
for _, f := range filters {
var op selection.Operator
switch f.Op {
case filterOPNotEquals:
op = selection.NotEquals
default:
op = selection.Equals
}
fields = append(fields, kube.FieldFilter{
Key: f.Key,
Value: f.Value,
Op: op,
})
}
p.filters.Fields = fields
return nil
}
}
// withExtractPodAssociations allows specifying options to associate pod metadata with incoming resource
func withExtractPodAssociations(podAssociations ...PodAssociationConfig) option {
return func(p *kubernetesprocessor) error {
associations := make([]kube.Association, 0, len(podAssociations))
var assoc kube.Association
for _, association := range podAssociations {
assoc = kube.Association{
Sources: []kube.AssociationSource{},
}
var name string
for _, associationSource := range association.Sources {
if associationSource.From == kube.ConnectionSource {
name = ""
} else {
name = associationSource.Name
}
assoc.Sources = append(assoc.Sources, kube.AssociationSource{
From: associationSource.From,
Name: name,
})
}
associations = append(associations, assoc)
}
p.podAssociations = associations
return nil
}
}
// withExcludes allows specifying pods to exclude
func withExcludes(podExclude ExcludeConfig) option {
return func(p *kubernetesprocessor) error {
ignoredNames := kube.Excludes{}
names := podExclude.Pods
if len(names) == 0 {
names = []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}
}
for _, name := range names {
ignoredNames.Pods = append(ignoredNames.Pods, kube.ExcludePods{Name: regexp.MustCompile(name.Name)})
}
p.podIgnore = ignoredNames
return nil
}
}
// withWaitForMetadata allows specifying whether to wait for pod metadata to be synced.
func withWaitForMetadata(wait bool) option {
return func(p *kubernetesprocessor) error {
p.waitForMetadata = wait
return nil
}
}
// withWaitForMetadataTimeout allows specifying the timeout for waiting for pod metadata to be synced.
func withWaitForMetadataTimeout(timeout time.Duration) option {
return func(p *kubernetesprocessor) error {
p.waitForMetadataTimeout = timeout
return nil
}
}