in internal/pkg/composable/providers/kubernetes/pod.go [376:539]
func generateContainerData(
comm composable.DynamicProviderComm,
pod *kubernetes.Pod,
kubeMetaGen metadata.MetaGen,
namespaceAnnotations mapstr.M,
logger *logp.Logger,
managed bool,
config *Config) {
containers := kubernetes.GetContainersInPod(pod)
// Pass annotations to all events so that it can be used in templating and by annotation builders.
annotations := mapstr.M{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
_ = safemapstr.Put(annotations, k, v)
}
// Pass labels to all events so that it can be used in templating.
labels := mapstr.M{}
for k, v := range pod.GetObjectMeta().GetLabels() {
_ = safemapstr.Put(labels, k, v)
}
for _, c := range containers {
// If it doesn't have an ID, container doesn't exist in
// the runtime, emit only an event if we are stopping, so
// we are sure of cleaning up configurations.
if c.ID == "" {
continue
}
// ID is the combination of pod UID + container name
eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Spec.Name)
meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.Spec.Name))
kubemetaMap, err := meta.GetValue("kubernetes")
if err != nil {
continue
}
// k8sMapping includes only the metadata that fall under kubernetes.*
// and these are available as dynamic vars through the provider
k8sMapping := map[string]interface{}(kubemetaMap.(mapstr.M).Clone())
if len(namespaceAnnotations) != 0 {
k8sMapping["namespace_annotations"] = namespaceAnnotations
}
// add annotations and labels to be discoverable by templates
k8sMapping["annotations"] = annotations
k8sMapping["labels"] = labels
//container ECS fields
cmeta := mapstr.M{
"id": c.ID,
"runtime": c.Runtime,
"image": mapstr.M{
"name": c.Spec.Image,
},
}
processors := []map[string]interface{}{
{
"add_fields": map[string]interface{}{
"fields": cmeta,
"target": "container",
},
},
}
// meta map includes metadata that go under kubernetes.*
// but also other ECS fields like orchestrator.*
for field, metaMap := range meta {
processor := map[string]interface{}{
"add_fields": map[string]interface{}{
"fields": metaMap,
"target": field,
},
}
processors = append(processors, processor)
}
// add container metadata under kubernetes.container.* to
// make them available to dynamic var resolution
containerMeta := mapstr.M{
"id": c.ID,
"name": c.Spec.Name,
"image": c.Spec.Image,
"runtime": c.Runtime,
}
if len(c.Spec.Ports) > 0 {
for _, port := range c.Spec.Ports {
_, _ = containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort))
_, _ = containerMeta.Put("port_name", port.Name)
k8sMapping["container"] = containerMeta
if config.Hints.Enabled { // This is "hints based autodiscovery flow"
if !managed {
hintData := GetHintsMapping(k8sMapping, logger, config.Prefix, c.ID)
if len(hintData.composableMapping) > 0 {
if len(hintData.processors) > 0 {
processors = updateProcessors(hintData.processors, processors)
}
_ = comm.AddOrUpdate(
eventID,
PodPriority,
map[string]interface{}{"hints": hintData.composableMapping},
processors,
)
} else if config.Hints.DefaultContainerLogs {
// in case of no package detected in the hints fallback to the generic log collection
_, _ = hintData.composableMapping.Put("container_logs.enabled", true)
_, _ = hintData.composableMapping.Put("container_id", c.ID)
if len(hintData.processors) > 0 {
processors = updateProcessors(hintData.processors, processors)
}
_ = comm.AddOrUpdate(
eventID,
PodPriority,
map[string]interface{}{"hints": hintData.composableMapping},
processors,
)
}
}
} else { // This is the "template-based autodiscovery" flow
_ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors)
}
}
} else {
k8sMapping["container"] = containerMeta
if config.Hints.Enabled { // This is "hints based autodiscovery flow"
if !managed {
hintData := GetHintsMapping(k8sMapping, logger, config.Prefix, c.ID)
if len(hintData.composableMapping) > 0 {
if len(hintData.processors) > 0 {
processors = updateProcessors(hintData.processors, processors)
}
_ = comm.AddOrUpdate(
eventID,
PodPriority,
map[string]interface{}{"hints": hintData.composableMapping},
processors,
)
} else if config.Hints.DefaultContainerLogs {
// in case of no package detected in the hints fallback to the generic log collection
_, _ = hintData.composableMapping.Put("container_logs.enabled", true)
_, _ = hintData.composableMapping.Put("container_id", c.ID)
if len(hintData.processors) > 0 {
processors = updateProcessors(hintData.processors, processors)
}
_ = comm.AddOrUpdate(
eventID,
PodPriority,
map[string]interface{}{"hints": hintData.composableMapping},
processors,
)
}
}
} else { // This is the "template-based autodiscovery" flow
_ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors)
}
}
}
}