receiver/dockerstatsreceiver/receiver.go (260 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package dockerstatsreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver"
import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"time"
ctypes "github.com/docker/docker/api/types/container"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/scraper/scrapererror"
"go.uber.org/multierr"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/docker"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/dockerstatsreceiver/internal/metadata"
)
var (
defaultDockerAPIVersion = "1.25"
minimumRequiredDockerAPIVersion = docker.MustNewAPIVersion(defaultDockerAPIVersion)
)
type resultV2 struct {
stats *ctypes.StatsResponse
container *docker.Container
err error
}
type metricsReceiver struct {
config *Config
settings receiver.Settings
client *docker.Client
mb *metadata.MetricsBuilder
cancel context.CancelFunc
}
func newMetricsReceiver(set receiver.Settings, config *Config) *metricsReceiver {
return &metricsReceiver{
config: config,
settings: set,
mb: metadata.NewMetricsBuilder(config.MetricsBuilderConfig, set),
}
}
func (r *metricsReceiver) start(ctx context.Context, _ component.Host) error {
var err error
r.client, err = docker.NewDockerClient(&r.config.Config, r.settings.Logger)
if err != nil {
return err
}
if err = r.client.LoadContainerList(ctx); err != nil {
return err
}
cctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
go r.client.ContainerEventLoop(cctx)
return nil
}
func (r *metricsReceiver) shutdown(context.Context) error {
if r.cancel != nil {
r.cancel()
}
return nil
}
func (r *metricsReceiver) scrapeV2(ctx context.Context) (pmetric.Metrics, error) {
containers := r.client.Containers()
results := make(chan resultV2, len(containers))
wg := &sync.WaitGroup{}
wg.Add(len(containers))
for _, container := range containers {
go func(c docker.Container) {
defer wg.Done()
statsJSON, err := r.client.FetchContainerStatsAsJSON(ctx, c)
if err != nil {
results <- resultV2{nil, &c, err}
return
}
results <- resultV2{
stats: statsJSON,
container: &c,
err: nil,
}
}(container)
}
wg.Wait()
close(results)
var errs error
now := pcommon.NewTimestampFromTime(time.Now())
for res := range results {
if res.err != nil {
// Don't know the number of failed stats, but one container fetch is a partial error.
errs = multierr.Append(errs, scrapererror.NewPartialScrapeError(res.err, 0))
continue
}
if err := r.recordContainerStats(now, res.stats, res.container); err != nil {
errs = multierr.Append(errs, err)
}
}
return r.mb.Emit(), errs
}
func (r *metricsReceiver) recordContainerStats(now pcommon.Timestamp, containerStats *ctypes.StatsResponse, container *docker.Container) error {
var errs error
r.recordCPUMetrics(now, &containerStats.CPUStats, &containerStats.PreCPUStats)
r.recordMemoryMetrics(now, &containerStats.MemoryStats)
r.recordBlkioMetrics(now, &containerStats.BlkioStats)
r.recordNetworkMetrics(now, &containerStats.Networks)
r.recordPidsMetrics(now, &containerStats.PidsStats)
if err := r.recordBaseMetrics(now, container.ContainerJSONBase); err != nil {
errs = multierr.Append(errs, err)
}
if err := r.recordHostConfigMetrics(now, container.ContainerJSON); err != nil {
errs = multierr.Append(errs, err)
}
r.mb.RecordContainerRestartsDataPoint(now, int64(container.RestartCount))
// Always-present resource attrs + the user-configured resource attrs
rb := r.mb.NewResourceBuilder()
rb.SetContainerRuntime("docker")
rb.SetContainerHostname(container.Config.Hostname)
rb.SetContainerID(container.ID)
rb.SetContainerImageName(container.Config.Image)
rb.SetContainerName(strings.TrimPrefix(container.Name, "/"))
rb.SetContainerImageID(container.Image)
rb.SetContainerCommandLine(strings.Join(container.Config.Cmd, " "))
resource := rb.Emit()
for k, label := range r.config.EnvVarsToMetricLabels {
if v := container.EnvMap[k]; v != "" {
resource.Attributes().PutStr(label, v)
}
}
for k, label := range r.config.ContainerLabelsToMetricLabels {
if v := container.Config.Labels[k]; v != "" {
resource.Attributes().PutStr(label, v)
}
}
r.mb.EmitForResource(metadata.WithResource(resource))
return errs
}
func (r *metricsReceiver) recordMemoryMetrics(now pcommon.Timestamp, memoryStats *ctypes.MemoryStats) {
totalUsage := calculateMemUsageNoCache(memoryStats)
r.mb.RecordContainerMemoryUsageTotalDataPoint(now, int64(totalUsage))
r.mb.RecordContainerMemoryUsageLimitDataPoint(now, int64(memoryStats.Limit))
r.mb.RecordContainerMemoryPercentDataPoint(now, calculateMemoryPercent(memoryStats.Limit, totalUsage))
r.mb.RecordContainerMemoryUsageMaxDataPoint(now, int64(memoryStats.MaxUsage))
r.mb.RecordContainerMemoryFailsDataPoint(now, int64(memoryStats.Failcnt))
recorders := map[string]func(pcommon.Timestamp, int64){
"cache": r.mb.RecordContainerMemoryCacheDataPoint,
"total_cache": r.mb.RecordContainerMemoryTotalCacheDataPoint,
"rss": r.mb.RecordContainerMemoryRssDataPoint,
"total_rss": r.mb.RecordContainerMemoryTotalRssDataPoint,
"rss_huge": r.mb.RecordContainerMemoryRssHugeDataPoint,
"total_rss_huge": r.mb.RecordContainerMemoryTotalRssHugeDataPoint,
"dirty": r.mb.RecordContainerMemoryDirtyDataPoint,
"total_dirty": r.mb.RecordContainerMemoryTotalDirtyDataPoint,
"writeback": r.mb.RecordContainerMemoryWritebackDataPoint,
"total_writeback": r.mb.RecordContainerMemoryTotalWritebackDataPoint,
"mapped_file": r.mb.RecordContainerMemoryMappedFileDataPoint,
"total_mapped_file": r.mb.RecordContainerMemoryTotalMappedFileDataPoint,
"pgpgin": r.mb.RecordContainerMemoryPgpginDataPoint,
"total_pgpgin": r.mb.RecordContainerMemoryTotalPgpginDataPoint,
"pgpgout": r.mb.RecordContainerMemoryPgpgoutDataPoint,
"total_pgpgout": r.mb.RecordContainerMemoryTotalPgpgoutDataPoint,
"pgfault": r.mb.RecordContainerMemoryPgfaultDataPoint,
"total_pgfault": r.mb.RecordContainerMemoryTotalPgfaultDataPoint,
"pgmajfault": r.mb.RecordContainerMemoryPgmajfaultDataPoint,
"total_pgmajfault": r.mb.RecordContainerMemoryTotalPgmajfaultDataPoint,
"inactive_anon": r.mb.RecordContainerMemoryInactiveAnonDataPoint,
"total_inactive_anon": r.mb.RecordContainerMemoryTotalInactiveAnonDataPoint,
"active_anon": r.mb.RecordContainerMemoryActiveAnonDataPoint,
"total_active_anon": r.mb.RecordContainerMemoryTotalActiveAnonDataPoint,
"inactive_file": r.mb.RecordContainerMemoryInactiveFileDataPoint,
"total_inactive_file": r.mb.RecordContainerMemoryTotalInactiveFileDataPoint,
"active_file": r.mb.RecordContainerMemoryActiveFileDataPoint,
"total_active_file": r.mb.RecordContainerMemoryTotalActiveFileDataPoint,
"unevictable": r.mb.RecordContainerMemoryUnevictableDataPoint,
"total_unevictable": r.mb.RecordContainerMemoryTotalUnevictableDataPoint,
"hierarchical_memory_limit": r.mb.RecordContainerMemoryHierarchicalMemoryLimitDataPoint,
"hierarchical_memsw_limit": r.mb.RecordContainerMemoryHierarchicalMemswLimitDataPoint,
"anon": r.mb.RecordContainerMemoryAnonDataPoint,
"file": r.mb.RecordContainerMemoryFileDataPoint,
}
for name, val := range memoryStats.Stats {
if recorder, ok := recorders[name]; ok {
recorder(now, int64(val))
}
}
}
type blkioRecorder func(now pcommon.Timestamp, val int64, devMaj string, devMin string, operation string)
func (r *metricsReceiver) recordBlkioMetrics(now pcommon.Timestamp, blkioStats *ctypes.BlkioStats) {
recordSingleBlkioStat(now, blkioStats.IoMergedRecursive, r.mb.RecordContainerBlockioIoMergedRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoQueuedRecursive, r.mb.RecordContainerBlockioIoQueuedRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoServiceBytesRecursive, r.mb.RecordContainerBlockioIoServiceBytesRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoServiceTimeRecursive, r.mb.RecordContainerBlockioIoServiceTimeRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoServicedRecursive, r.mb.RecordContainerBlockioIoServicedRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoTimeRecursive, r.mb.RecordContainerBlockioIoTimeRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.IoWaitTimeRecursive, r.mb.RecordContainerBlockioIoWaitTimeRecursiveDataPoint)
recordSingleBlkioStat(now, blkioStats.SectorsRecursive, r.mb.RecordContainerBlockioSectorsRecursiveDataPoint)
}
func recordSingleBlkioStat(now pcommon.Timestamp, statEntries []ctypes.BlkioStatEntry, recorder blkioRecorder) {
for _, stat := range statEntries {
recorder(
now,
int64(stat.Value),
strconv.FormatUint(stat.Major, 10),
strconv.FormatUint(stat.Minor, 10),
strings.ToLower(stat.Op))
}
}
func (r *metricsReceiver) recordNetworkMetrics(now pcommon.Timestamp, networks *map[string]ctypes.NetworkStats) {
if networks == nil || *networks == nil {
return
}
for netInterface, stats := range *networks {
r.mb.RecordContainerNetworkIoUsageRxBytesDataPoint(now, int64(stats.RxBytes), netInterface)
r.mb.RecordContainerNetworkIoUsageTxBytesDataPoint(now, int64(stats.TxBytes), netInterface)
r.mb.RecordContainerNetworkIoUsageRxDroppedDataPoint(now, int64(stats.RxDropped), netInterface)
r.mb.RecordContainerNetworkIoUsageTxDroppedDataPoint(now, int64(stats.TxDropped), netInterface)
r.mb.RecordContainerNetworkIoUsageRxPacketsDataPoint(now, int64(stats.RxPackets), netInterface)
r.mb.RecordContainerNetworkIoUsageTxPacketsDataPoint(now, int64(stats.TxPackets), netInterface)
r.mb.RecordContainerNetworkIoUsageRxErrorsDataPoint(now, int64(stats.RxErrors), netInterface)
r.mb.RecordContainerNetworkIoUsageTxErrorsDataPoint(now, int64(stats.TxErrors), netInterface)
}
}
func (r *metricsReceiver) recordCPUMetrics(now pcommon.Timestamp, cpuStats *ctypes.CPUStats, prevStats *ctypes.CPUStats) {
r.mb.RecordContainerCPUUsageSystemDataPoint(now, int64(cpuStats.SystemUsage))
r.mb.RecordContainerCPUUsageTotalDataPoint(now, int64(cpuStats.CPUUsage.TotalUsage))
r.mb.RecordContainerCPUUsageKernelmodeDataPoint(now, int64(cpuStats.CPUUsage.UsageInKernelmode))
r.mb.RecordContainerCPUUsageUsermodeDataPoint(now, int64(cpuStats.CPUUsage.UsageInUsermode))
r.mb.RecordContainerCPUThrottlingDataThrottledPeriodsDataPoint(now, int64(cpuStats.ThrottlingData.ThrottledPeriods))
r.mb.RecordContainerCPUThrottlingDataPeriodsDataPoint(now, int64(cpuStats.ThrottlingData.Periods))
r.mb.RecordContainerCPUThrottlingDataThrottledTimeDataPoint(now, int64(cpuStats.ThrottlingData.ThrottledTime))
r.mb.RecordContainerCPUUtilizationDataPoint(now, calculateCPUPercent(prevStats, cpuStats))
r.mb.RecordContainerCPULogicalCountDataPoint(now, int64(cpuStats.OnlineCPUs))
for coreNum, v := range cpuStats.CPUUsage.PercpuUsage {
r.mb.RecordContainerCPUUsagePercpuDataPoint(now, int64(v), "cpu"+strconv.Itoa(coreNum))
}
}
func (r *metricsReceiver) recordPidsMetrics(now pcommon.Timestamp, pidsStats *ctypes.PidsStats) {
// pidsStats are available when kernel version is >= 4.3 and pids_cgroup is supported, it is empty otherwise.
if pidsStats.Current != 0 {
r.mb.RecordContainerPidsCountDataPoint(now, int64(pidsStats.Current))
if pidsStats.Limit != 0 {
r.mb.RecordContainerPidsLimitDataPoint(now, int64(pidsStats.Limit))
}
}
}
func (r *metricsReceiver) recordBaseMetrics(now pcommon.Timestamp, base *ctypes.ContainerJSONBase) error {
t, err := time.Parse(time.RFC3339, base.State.StartedAt)
if err != nil {
// value not available or invalid
return scrapererror.NewPartialScrapeError(fmt.Errorf("error retrieving container.uptime from Container.State.StartedAt: %w", err), 1)
}
if v := now.AsTime().Sub(t); v > 0 {
r.mb.RecordContainerUptimeDataPoint(now, v.Seconds())
}
return nil
}
func (r *metricsReceiver) recordHostConfigMetrics(now pcommon.Timestamp, containerJSON *ctypes.InspectResponse) error {
r.mb.RecordContainerCPUSharesDataPoint(now, containerJSON.HostConfig.CPUShares)
cpuLimit, err := calculateCPULimit(containerJSON.HostConfig)
if err != nil {
return scrapererror.NewPartialScrapeError(fmt.Errorf("error retrieving container.cpu.limit: %w", err), 1)
}
if cpuLimit > 0 {
r.mb.RecordContainerCPULimitDataPoint(now, cpuLimit)
}
return nil
}