in metricbeat/module/kubernetes/container/data.go [32:177]
func eventMapping(content []byte, metricsRepo *util.MetricsRepo, logger *logp.Logger) ([]mapstr.M, error) {
events := []mapstr.M{}
var summary kubernetes.Summary
err := json.Unmarshal(content, &summary)
if err != nil {
return nil, fmt.Errorf("cannot unmarshal json response: %w", err)
}
node := summary.Node
nodeCores := 0.0
nodeMem := 0.0
nodeStore := metricsRepo.GetNodeStore(node.NodeName)
nodeMetrics := nodeStore.GetNodeMetrics()
if nodeMetrics.CoresAllocatable != nil {
nodeCores = nodeMetrics.CoresAllocatable.Value
}
if nodeMetrics.MemoryAllocatable != nil {
nodeMem = nodeMetrics.MemoryAllocatable.Value
}
for _, pod := range summary.Pods {
podId := util.NewPodId(pod.PodRef.Namespace, pod.PodRef.Name)
podStore := nodeStore.GetPodStore(podId)
for _, container := range pod.Containers {
containerEvent := mapstr.M{
mb.ModuleDataKey: mapstr.M{
"namespace": pod.PodRef.Namespace,
"node": mapstr.M{
"name": node.NodeName,
},
"pod": mapstr.M{
"name": pod.PodRef.Name,
},
},
"name": container.Name,
"cpu": mapstr.M{
"usage": mapstr.M{
"nanocores": container.CPU.UsageNanoCores,
"core": mapstr.M{
"ns": container.CPU.UsageCoreNanoSeconds,
},
},
},
"memory": mapstr.M{
"available": mapstr.M{
"bytes": container.Memory.AvailableBytes,
},
"usage": mapstr.M{
"bytes": container.Memory.UsageBytes,
},
"workingset": mapstr.M{
"bytes": container.Memory.WorkingSetBytes,
},
"rss": mapstr.M{
"bytes": container.Memory.RssBytes,
},
"pagefaults": container.Memory.PageFaults,
"majorpagefaults": container.Memory.MajorPageFaults,
},
"rootfs": mapstr.M{
"available": mapstr.M{
"bytes": container.Rootfs.AvailableBytes,
},
"capacity": mapstr.M{
"bytes": container.Rootfs.CapacityBytes,
},
"used": mapstr.M{
"bytes": container.Rootfs.UsedBytes,
},
"inodes": mapstr.M{
"used": container.Rootfs.InodesUsed,
},
},
"logs": mapstr.M{
"available": mapstr.M{
"bytes": container.Logs.AvailableBytes,
},
"capacity": mapstr.M{
"bytes": container.Logs.CapacityBytes,
},
"used": mapstr.M{
"bytes": container.Logs.UsedBytes,
},
"inodes": mapstr.M{
"used": container.Logs.InodesUsed,
"free": container.Logs.InodesFree,
"count": container.Logs.Inodes,
},
},
}
if container.StartTime != "" {
kubernetes2.ShouldPut(containerEvent, "start_time", container.StartTime, logger)
}
if nodeCores > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.node.pct", float64(container.CPU.UsageNanoCores)/1e9/nodeCores, logger)
}
if nodeMem > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.node.pct", float64(container.Memory.UsageBytes)/nodeMem, logger)
}
containerStore := podStore.GetContainerStore(container.Name)
containerMetrics := containerStore.GetContainerMetrics()
containerCoresLimit := nodeCores
if containerMetrics.CoresLimit != nil {
containerCoresLimit = containerMetrics.CoresLimit.Value
}
containerMemLimit := nodeMem
if containerMetrics.MemoryLimit != nil {
containerMemLimit = containerMetrics.MemoryLimit.Value
}
// NOTE:
// we don't currently check if `containerMemLimit` > `nodeMem` as we do in `kubernetes/pod/data.go`.
// There we do check, since if a container doesn't have a limit set, it will inherit the node limits and the sum of all
// the container limits can be greater than the node limits. We assume here the user can set correct limits on containers.
if containerCoresLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "cpu.usage.limit.pct", float64(container.CPU.UsageNanoCores)/1e9/containerCoresLimit, logger)
}
if containerMemLimit > 0 {
kubernetes2.ShouldPut(containerEvent, "memory.usage.limit.pct", float64(container.Memory.UsageBytes)/containerMemLimit, logger)
kubernetes2.ShouldPut(containerEvent, "memory.workingset.limit.pct", float64(container.Memory.WorkingSetBytes)/containerMemLimit, logger)
}
events = append(events, containerEvent)
}
}
return events, nil
}