func eventMapping()

in metricbeat/module/kubernetes/container/data.go [32:177]


func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Logger) ([]mapstr.M, error) {
	events := []mapstr.M{}
	var summary kubernetes.Summary

	err := json.Unmarshal(content, &summary)
	if err != nil {
		return nil, fmt.Errorf("cannot unmarshal json response: %w", err)
	}

	node := summary.Node

	nodeCores := 0.0
	nodeMem := 0.0

	nodeStore := metricsRepo.GetNodeStore(node.NodeName)
	nodeMetrics := nodeStore.GetNodeMetrics()
	if nodeMetrics.CoresAllocatable != nil {
		nodeCores = nodeMetrics.CoresAllocatable.Value
	}
	if nodeMetrics.MemoryAllocatable != nil {
		nodeMem = nodeMetrics.MemoryAllocatable.Value
	}

	for _, pod := range summary.Pods {
		podId := util.NewPodId(pod.PodRef.Namespace, pod.PodRef.Name)
		podStore := nodeStore.GetPodStore(podId)

		for _, container := range pod.Containers {
			containerEvent := mapstr.M{
				mb.ModuleDataKey: mapstr.M{
					"namespace": pod.PodRef.Namespace,
					"node": mapstr.M{
						"name": node.NodeName,
					},
					"pod": mapstr.M{
						"name": pod.PodRef.Name,
					},
				},

				"name": container.Name,

				"cpu": mapstr.M{
					"usage": mapstr.M{
						"nanocores": container.CPU.UsageNanoCores,
						"core": mapstr.M{
							"ns": container.CPU.UsageCoreNanoSeconds,
						},
					},
				},

				"memory": mapstr.M{
					"available": mapstr.M{
						"bytes": container.Memory.AvailableBytes,
					},
					"usage": mapstr.M{
						"bytes": container.Memory.UsageBytes,
					},
					"workingset": mapstr.M{
						"bytes": container.Memory.WorkingSetBytes,
					},
					"rss": mapstr.M{
						"bytes": container.Memory.RssBytes,
					},
					"pagefaults":      container.Memory.PageFaults,
					"majorpagefaults": container.Memory.MajorPageFaults,
				},

				"rootfs": mapstr.M{
					"available": mapstr.M{
						"bytes": container.Rootfs.AvailableBytes,
					},
					"capacity": mapstr.M{
						"bytes": container.Rootfs.CapacityBytes,
					},
					"used": mapstr.M{
						"bytes": container.Rootfs.UsedBytes,
					},
					"inodes": mapstr.M{
						"used": container.Rootfs.InodesUsed,
					},
				},

				"logs": mapstr.M{
					"available": mapstr.M{
						"bytes": container.Logs.AvailableBytes,
					},
					"capacity": mapstr.M{
						"bytes": container.Logs.CapacityBytes,
					},
					"used": mapstr.M{
						"bytes": container.Logs.UsedBytes,
					},
					"inodes": mapstr.M{
						"used":  container.Logs.InodesUsed,
						"free":  container.Logs.InodesFree,
						"count": container.Logs.Inodes,
					},
				},
			}

			if container.StartTime != "" {
				kubernetes2.ShouldPut(containerEvent, "start_time", container.StartTime, logger)
			}

			if nodeCores > 0 {
				kubernetes2.ShouldPut(containerEvent, "cpu.usage.node.pct", float64(container.CPU.UsageNanoCores)/1e9/nodeCores, logger)
			}

			if nodeMem > 0 {
				kubernetes2.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
			}

			containerStore := podStore.GetContainerStore(container.Name)
			containerMetrics := containerStore.GetContainerMetrics()

			containerCoresLimit := nodeCores
			if containerMetrics.CoresLimit != nil {
				containerCoresLimit = containerMetrics.CoresLimit.Value
			}

			containerMemLimit := nodeMem
			if containerMetrics.MemoryLimit != nil {
				containerMemLimit = containerMetrics.MemoryLimit.Value
			}

			// NOTE:
			// we don't currently check if `containerMemLimit` > `nodeMem` as we do in `kubernetes/pod/data.go`.
			// There we do check, since if a container doesn't have a limit set, it will inherit the node limits and the sum of all
			// the container limits can be greater than the node limits. We assume here the user can set correct limits on containers.

			if containerCoresLimit > 0 {
				kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
			}

			if containerMemLimit > 0 {
				kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
				kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
			}

			events = append(events, containerEvent)
		}

	}

	return events, nil
}