func eventMapping()

in metricbeat/module/kubernetes/pod/data.go [32:177]


func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Logger) ([]mapstr.M, error) {
	events := []mapstr.M{}

	var summary kubernetes.Summary
	err := json.Unmarshal(content, &summary)
	if err != nil {
		return nil, fmt.Errorf("cannot unmarshal json response: %w", err)
	}

	node := summary.Node

	nodeCores := 0.0
	nodeMem := 0.0

	nodeStore := metricsRepo.GetNodeStore(node.NodeName)
	nodeMetrics := nodeStore.GetNodeMetrics()
	if nodeMetrics.CoresAllocatable != nil {
		nodeCores = nodeMetrics.CoresAllocatable.Value
	}
	if nodeMetrics.MemoryAllocatable != nil {
		nodeMem = nodeMetrics.MemoryAllocatable.Value
	}
	for _, pod := range summary.Pods {
		var usageNanoCores, usageMem, availMem, rss, workingSet, pageFaults, majorPageFaults uint64
		var podCoreLimit, podMemLimit float64

		podId := util.NewPodId(pod.PodRef.Namespace, pod.PodRef.Name)
		podStore := nodeStore.GetPodStore(podId)

		allContainersCPULimitsDefined := true
		allContainersMemoryLimitsDefined := true

		for _, container := range pod.Containers {
			usageNanoCores += container.CPU.UsageNanoCores
			usageMem += container.Memory.UsageBytes
			availMem += container.Memory.AvailableBytes
			rss += container.Memory.RssBytes
			workingSet += container.Memory.WorkingSetBytes
			pageFaults += container.Memory.PageFaults
			majorPageFaults += container.Memory.MajorPageFaults

			containerStore := podStore.GetContainerStore(container.Name)
			containerMetrics := containerStore.GetContainerMetrics()

			// podCoreLimit and podMemLimit are defined only if all of Pod containers have a limit defined, otherwise the limit will be set to 0
			if allContainersCPULimitsDefined && containerMetrics.CoresLimit == nil {
				allContainersCPULimitsDefined = false
				podCoreLimit = 0.0
			}
			if allContainersCPULimitsDefined {
				podCoreLimit += containerMetrics.CoresLimit.Value
			}

			if allContainersMemoryLimitsDefined && containerMetrics.MemoryLimit == nil {
				allContainersMemoryLimitsDefined = false
				podMemLimit = 0.0
			}
			if allContainersMemoryLimitsDefined {
				podMemLimit += containerMetrics.MemoryLimit.Value
			}
		}

		podEvent := mapstr.M{
			mb.ModuleDataKey: mapstr.M{
				"namespace": pod.PodRef.Namespace,
				"node": mapstr.M{
					"name": node.NodeName,
				},
			},
			"name": pod.PodRef.Name,
			"uid":  pod.PodRef.UID,

			"cpu": mapstr.M{
				"usage": mapstr.M{
					"nanocores": usageNanoCores,
				},
			},

			"memory": mapstr.M{
				"usage": mapstr.M{
					"bytes": usageMem,
				},
				"available": mapstr.M{
					"bytes": availMem,
				},
				"working_set": mapstr.M{
					"bytes": workingSet,
				},
				"rss": mapstr.M{
					"bytes": rss,
				},
				"page_faults":       pageFaults,
				"major_page_faults": majorPageFaults,
			},

			"network": mapstr.M{
				"rx": mapstr.M{
					"bytes":  pod.Network.RxBytes,
					"errors": pod.Network.RxErrors,
				},
				"tx": mapstr.M{
					"bytes":  pod.Network.TxBytes,
					"errors": pod.Network.TxErrors,
				},
			},
		}

		if pod.StartTime != "" {
			kubernetes2.ShouldPut(podEvent, "start_time", pod.StartTime, logger)
		}

		// `nodeCores` can be 0 if `state_node` and/or `node` metricsets are disabled
		if nodeCores > 0 {
			kubernetes2.ShouldPut(podEvent, "cpu.usage.node.pct", float64(usageNanoCores)/1e9/nodeCores, logger)
		}

		if podCoreLimit > 0 {
			kubernetes2.ShouldPut(podEvent, "cpu.usage.limit.pct", float64(usageNanoCores)/1e9/podCoreLimit, logger)
		}

		if usageMem > 0 {
			// `nodeMem` can be 0 if `state_node` and/or `node` metricsets are disabled
			if nodeMem > 0 {
				kubernetes2.ShouldPut(podEvent, "memory.usage.node.pct", float64(usageMem)/nodeMem, logger)
			}
			if podMemLimit > 0 {
				kubernetes2.ShouldPut(podEvent, "memory.usage.limit.pct", float64(usageMem)/podMemLimit, logger)
				kubernetes2.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
			}
		}

		if workingSet > 0 && usageMem == 0 {
			if nodeMem > 0 {
				kubernetes2.ShouldPut(podEvent, "memory.usage.node.pct", float64(workingSet)/nodeMem, logger)
			}
			if podMemLimit > 0 {
				kubernetes2.ShouldPut(podEvent, "memory.usage.limit.pct", float64(workingSet)/podMemLimit, logger)

				kubernetes2.ShouldPut(podEvent, "memory.working_set.limit.pct", float64(workingSet)/podMemLimit, logger)
			}
		}

		events = append(events, podEvent)
	}
	return events, nil
}