remappers/kubernetesmetrics/kubelet.go (105 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kubernetesmetrics
import (
"github.com/elastic/opentelemetry-lib/remappers/internal/remappedmetric"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)
func addKubeletMetrics(
src, out pmetric.MetricSlice,
_ pcommon.Resource,
mutator func(pmetric.NumberDataPoint),
) error {
var timestamp pcommon.Timestamp
var total_transmited, total_received int64
var cpu_limit_utilization, memory_limit_utilization, pod_cpu_usage_node, pod_memory_usage_node float64
// iterate all metrics in the current scope and generate the additional Elastic kubernetes integration metrics
//pod
for i := 0; i < src.Len(); i++ {
metric := src.At(i)
if metric.Name() == "k8s.pod.cpu_limit_utilization" {
dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
cpu_limit_utilization = dp.DoubleValue()
} else if metric.Name() == "k8s.pod.cpu.node.utilization" {
dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
pod_cpu_usage_node = dp.DoubleValue()
} else if metric.Name() == "k8s.pod.memory_limit_utilization" {
dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
memory_limit_utilization = dp.DoubleValue()
} else if metric.Name() == "k8s.pod.memory.node.utilization" {
dp := metric.Gauge().DataPoints().At(0)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
pod_memory_usage_node = dp.DoubleValue()
} else if metric.Name() == "k8s.pod.network.io" {
dataPoints := metric.Sum().DataPoints()
for j := 0; j < dataPoints.Len(); j++ {
dp := dataPoints.At(j)
if timestamp == 0 {
timestamp = dp.Timestamp()
}
value := dp.IntValue()
if direction, ok := dp.Attributes().Get("direction"); ok {
switch direction.Str() {
case "receive":
total_received += value
case "transmit":
total_transmited += value
}
}
}
}
}
finalMutator := remappedmetric.ChainedMutator(
mutator,
func(dp pmetric.NumberDataPoint) {
dp.Attributes().PutStr("service.type", "kubernetes")
},
)
remappedmetric.Add(out, finalMutator,
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.pod.cpu.usage.limit.pct",
Timestamp: timestamp,
DoubleValue: &cpu_limit_utilization,
},
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.pod.cpu.usage.node.pct",
Timestamp: timestamp,
DoubleValue: &pod_cpu_usage_node,
},
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.pod.memory.usage.node.pct",
Timestamp: timestamp,
DoubleValue: &pod_memory_usage_node,
},
remappedmetric.Metric{
DataType: pmetric.MetricTypeGauge,
Name: "kubernetes.pod.memory.usage.limit.pct",
Timestamp: timestamp,
DoubleValue: &memory_limit_utilization,
},
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: "kubernetes.pod.network.tx.bytes",
Timestamp: timestamp,
IntValue: &total_transmited,
},
remappedmetric.Metric{
DataType: pmetric.MetricTypeSum,
Name: "kubernetes.pod.network.rx.bytes",
Timestamp: timestamp,
IntValue: &total_received,
},
)
return nil
}