func generateContainerData()

in internal/pkg/composable/providers/kubernetes/pod.go [376:539]


func generateContainerData(
	comm composable.DynamicProviderComm,
	pod *kubernetes.Pod,
	kubeMetaGen metadata.MetaGen,
	namespaceAnnotations mapstr.M,
	logger *logp.Logger,
	managed bool,
	config *Config) {

	containers := kubernetes.GetContainersInPod(pod)

	// Pass annotations to all events so that it can be used in templating and by annotation builders.
	annotations := mapstr.M{}
	for k, v := range pod.GetObjectMeta().GetAnnotations() {
		_ = safemapstr.Put(annotations, k, v)
	}

	// Pass labels to all events so that it can be used in templating.
	labels := mapstr.M{}
	for k, v := range pod.GetObjectMeta().GetLabels() {
		_ = safemapstr.Put(labels, k, v)
	}

	for _, c := range containers {
		// If it doesn't have an ID, container doesn't exist in
		// the runtime, emit only an event if we are stopping, so
		// we are sure of cleaning up configurations.
		if c.ID == "" {
			continue
		}

		// ID is the combination of pod UID + container name
		eventID := fmt.Sprintf("%s.%s", pod.GetObjectMeta().GetUID(), c.Spec.Name)

		meta := kubeMetaGen.Generate(pod, metadata.WithFields("container.name", c.Spec.Name))
		kubemetaMap, err := meta.GetValue("kubernetes")
		if err != nil {
			continue
		}

		// k8sMapping includes only the metadata that fall under kubernetes.*
		// and these are available as dynamic vars through the provider
		k8sMapping := map[string]interface{}(kubemetaMap.(mapstr.M).Clone())

		if len(namespaceAnnotations) != 0 {
			k8sMapping["namespace_annotations"] = namespaceAnnotations
		}
		// add annotations and labels to be discoverable by templates
		k8sMapping["annotations"] = annotations
		k8sMapping["labels"] = labels

		//container ECS fields
		cmeta := mapstr.M{
			"id":      c.ID,
			"runtime": c.Runtime,
			"image": mapstr.M{
				"name": c.Spec.Image,
			},
		}

		processors := []map[string]interface{}{
			{
				"add_fields": map[string]interface{}{
					"fields": cmeta,
					"target": "container",
				},
			},
		}
		// meta map includes metadata that go under kubernetes.*
		// but also other ECS fields like orchestrator.*
		for field, metaMap := range meta {
			processor := map[string]interface{}{
				"add_fields": map[string]interface{}{
					"fields": metaMap,
					"target": field,
				},
			}
			processors = append(processors, processor)
		}

		// add container metadata under kubernetes.container.* to
		// make them available to dynamic var resolution

		containerMeta := mapstr.M{
			"id":      c.ID,
			"name":    c.Spec.Name,
			"image":   c.Spec.Image,
			"runtime": c.Runtime,
		}

		if len(c.Spec.Ports) > 0 {
			for _, port := range c.Spec.Ports {
				_, _ = containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort))
				_, _ = containerMeta.Put("port_name", port.Name)
				k8sMapping["container"] = containerMeta

				if config.Hints.Enabled { // This is "hints based autodiscovery flow"
					if !managed {
						hintData := GetHintsMapping(k8sMapping, logger, config.Prefix, c.ID)
						if len(hintData.composableMapping) > 0 {
							if len(hintData.processors) > 0 {
								processors = updateProcessors(hintData.processors, processors)
							}
							_ = comm.AddOrUpdate(
								eventID,
								PodPriority,
								map[string]interface{}{"hints": hintData.composableMapping},
								processors,
							)
						} else if config.Hints.DefaultContainerLogs {
							// in case of no package detected in the hints fallback to the generic log collection
							_, _ = hintData.composableMapping.Put("container_logs.enabled", true)
							_, _ = hintData.composableMapping.Put("container_id", c.ID)
							if len(hintData.processors) > 0 {
								processors = updateProcessors(hintData.processors, processors)
							}
							_ = comm.AddOrUpdate(
								eventID,
								PodPriority,
								map[string]interface{}{"hints": hintData.composableMapping},
								processors,
							)
						}
					}
				} else { // This is the "template-based autodiscovery" flow
					_ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors)
				}
			}
		} else {
			k8sMapping["container"] = containerMeta
			if config.Hints.Enabled { // This is "hints based autodiscovery flow"
				if !managed {
					hintData := GetHintsMapping(k8sMapping, logger, config.Prefix, c.ID)
					if len(hintData.composableMapping) > 0 {
						if len(hintData.processors) > 0 {
							processors = updateProcessors(hintData.processors, processors)
						}
						_ = comm.AddOrUpdate(
							eventID,
							PodPriority,
							map[string]interface{}{"hints": hintData.composableMapping},
							processors,
						)
					} else if config.Hints.DefaultContainerLogs {
						// in case of no package detected in the hints fallback to the generic log collection
						_, _ = hintData.composableMapping.Put("container_logs.enabled", true)
						_, _ = hintData.composableMapping.Put("container_id", c.ID)
						if len(hintData.processors) > 0 {
							processors = updateProcessors(hintData.processors, processors)
						}
						_ = comm.AddOrUpdate(
							eventID,
							PodPriority,
							map[string]interface{}{"hints": hintData.composableMapping},
							processors,
						)
					}
				}
			} else { // This is the "template-based autodiscovery" flow
				_ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors)
			}
		}
	}
}