in processor/k8sattributesprocessor/internal/kube/client.go [451:565]
func (c *WatchClient) extractPodAttributes(pod *api_v1.Pod) map[string]string {
tags := map[string]string{}
if c.Rules.PodName {
tags[conventions.AttributeK8SPodName] = pod.Name
}
if c.Rules.PodHostName {
tags[tagHostName] = pod.Spec.Hostname
}
if c.Rules.PodIP {
tags[K8sIPLabelName] = pod.Status.PodIP
}
if c.Rules.Namespace {
tags[conventions.AttributeK8SNamespaceName] = pod.GetNamespace()
}
if c.Rules.StartTime {
ts := pod.GetCreationTimestamp()
if !ts.IsZero() {
if rfc3339ts, err := ts.MarshalText(); err != nil {
c.logger.Error("failed to unmarshal pod creation timestamp", zap.Error(err))
} else {
tags[tagStartTime] = string(rfc3339ts)
}
}
}
if c.Rules.PodUID {
uid := pod.GetUID()
tags[conventions.AttributeK8SPodUID] = string(uid)
}
if c.Rules.ReplicaSetID || c.Rules.ReplicaSetName ||
c.Rules.DaemonSetUID || c.Rules.DaemonSetName ||
c.Rules.JobUID || c.Rules.JobName ||
c.Rules.StatefulSetUID || c.Rules.StatefulSetName ||
c.Rules.DeploymentName || c.Rules.DeploymentUID ||
c.Rules.CronJobName {
for _, ref := range pod.OwnerReferences {
switch ref.Kind {
case "ReplicaSet":
if c.Rules.ReplicaSetID {
tags[conventions.AttributeK8SReplicaSetUID] = string(ref.UID)
}
if c.Rules.ReplicaSetName {
tags[conventions.AttributeK8SReplicaSetName] = ref.Name
}
if c.Rules.DeploymentName {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentName] = replicaset.Deployment.Name
}
}
}
if c.Rules.DeploymentUID {
if replicaset, ok := c.getReplicaSet(string(ref.UID)); ok {
if replicaset.Deployment.Name != "" {
tags[conventions.AttributeK8SDeploymentUID] = replicaset.Deployment.UID
}
}
}
case "DaemonSet":
if c.Rules.DaemonSetUID {
tags[conventions.AttributeK8SDaemonSetUID] = string(ref.UID)
}
if c.Rules.DaemonSetName {
tags[conventions.AttributeK8SDaemonSetName] = ref.Name
}
case "StatefulSet":
if c.Rules.StatefulSetUID {
tags[conventions.AttributeK8SStatefulSetUID] = string(ref.UID)
}
if c.Rules.StatefulSetName {
tags[conventions.AttributeK8SStatefulSetName] = ref.Name
}
case "Job":
if c.Rules.CronJobName {
parts := c.cronJobRegex.FindStringSubmatch(ref.Name)
if len(parts) == 2 {
tags[conventions.AttributeK8SCronJobName] = parts[1]
}
}
if c.Rules.JobUID {
tags[conventions.AttributeK8SJobUID] = string(ref.UID)
}
if c.Rules.JobName {
tags[conventions.AttributeK8SJobName] = ref.Name
}
}
}
}
if c.Rules.Node {
tags[tagNodeName] = pod.Spec.NodeName
}
if c.Rules.ClusterUID {
if val, ok := c.Namespaces["kube-system"]; ok {
tags[tagClusterUID] = val.NamespaceUID
} else {
c.logger.Debug("unable to find kube-system namespace, cluster uid will not be available")
}
}
for _, r := range c.Rules.Labels {
r.extractFromPodMetadata(pod.Labels, tags, "k8s.pod.labels.%s")
}
for _, r := range c.Rules.Annotations {
r.extractFromPodMetadata(pod.Annotations, tags, "k8s.pod.annotations.%s")
}
return tags
}