opentelemetry_collector/receiver/dockerstats/scraper.go (261 lines of code) (raw):

package dockerstats import ( "context" "encoding/json" "fmt" "io/ioutil" "strings" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/client" "go.opentelemetry.io/collector/consumer" "go.uber.org/zap" "github.com/googlecloudplatform/appengine-sidecars-docker/opentelemetry_collector/receiver/metricgenerator" mpb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus" ) var ( containerNameLabel = &mpb.LabelKey{ Key: "container_name", Description: "Name of the container (or ID if name is not available)", } cpuUsageDesc = &mpb.MetricDescriptor{ Name: "container/cpu/usage_time", Description: "Total CPU time consumed", Unit: "seconds", Type: mpb.MetricDescriptor_CUMULATIVE_DOUBLE, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } cpuLimitDesc = &mpb.MetricDescriptor{ Name: "container/cpu/limit", Description: "CPU time limit (where applicable)", Unit: "seconds", Type: mpb.MetricDescriptor_GAUGE_DOUBLE, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } memUsageDesc = &mpb.MetricDescriptor{ Name: "container/memory/usage", Description: "Total memory the container is using", Unit: "bytes", Type: mpb.MetricDescriptor_GAUGE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } memLimitDesc = &mpb.MetricDescriptor{ Name: "container/memory/limit", Description: "Total memory the container is allowed to use", Unit: "bytes", Type: mpb.MetricDescriptor_GAUGE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } nwRecvBytesDesc = &mpb.MetricDescriptor{ Name: "container/network/received_bytes_count", Description: "Bytes received by container over all network interfaces", Unit: "bytes", Type: mpb.MetricDescriptor_CUMULATIVE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } nwSentBytesDesc = &mpb.MetricDescriptor{ Name: "container/network/sent_bytes_count", Description: "Bytes sent by container over all network interfaces", Unit: "byte", Type: mpb.MetricDescriptor_CUMULATIVE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } // Container health metrics. uptimeDesc = &mpb.MetricDescriptor{ Name: "container/uptime", Description: "Container uptime", Unit: "seconds", Type: mpb.MetricDescriptor_GAUGE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } restartCountDesc = &mpb.MetricDescriptor{ Name: "container/restart_count", Description: "Number of times the container has been restarted.", Unit: "Count", Type: mpb.MetricDescriptor_CUMULATIVE_INT64, LabelKeys: []*mpb.LabelKey{containerNameLabel}, } ) type containerInfo struct { uptime time.Duration restartCount int64 cpuLimit int64 } type scraper struct { startTime time.Time scrapeInterval time.Duration done chan bool scrapeCount uint64 metricConsumer consumer.Metrics docker client.ContainerAPIClient logger *zap.Logger now func() time.Time } func newScraper(scrapeInterval time.Duration, metricConsumer consumer.Metrics, logger *zap.Logger) (*scraper, error) { docker, err := client.NewClientWithOpts(client.FromEnv) if err != nil { return nil, fmt.Errorf("failed to initialize docker client: %v", err) } return &scraper{ scrapeInterval: scrapeInterval, done: make(chan bool), metricConsumer: metricConsumer, docker: docker, logger: logger, now: time.Now, }, nil } func (s *scraper) start() { s.startTime = s.now() go func() { ticker := time.NewTicker(s.scrapeInterval) defer ticker.Stop() for { select { case <-ticker.C: s.export() s.scrapeCount++ case <-s.done: return } } }() } func (s *scraper) stop() { s.done <- true } func (s *scraper) export() { ctx, cancel := context.WithTimeout(context.Background(), s.scrapeInterval) defer cancel() containers, err := s.docker.ContainerList(ctx, types.ContainerListOptions{}) if err != nil { s.logger.Warn("Failed to get docker container list.", zap.Error(err)) return } var metrics []*mpb.Metric for _, container := range containers { var name string if len(container.Names) > 0 { // Docker container names are prefixed with their parent's name (/ means docker // daemon). See https://github.com/moby/moby/issues/6705#issuecomment-47298276. name = strings.TrimPrefix(container.Names[0], "/") } else { name = container.ID } labelValues := []*mpb.LabelValue{metricgenerator.MakeLabelValue(name)} cLogger := s.logger.With(zap.String("name", name), zap.String("id", container.ID)) stats, err := s.readResourceUsageStats(ctx, container.ID) if err != nil { cLogger.Warn("readResourceUsageStats failed.", zap.Error(err)) } else { metrics = append(metrics, s.usageStatsToMetrics(stats, labelValues)...) } info, err := s.readContainerInfo(ctx, container.ID) if err != nil { cLogger.Warn("readContainerInfo failed.", zap.Error(err)) } else { metrics = append(metrics, s.containerInfoToMetrics(info, labelValues)...) } } err = s.metricConsumer.ConsumeMetrics(ctx, opencensus.OCToMetrics(nil, nil, metrics)) if err != nil { s.logger.Error("Error sending docker stats metrics", zap.Error(err)) } } func (s *scraper) readResourceUsageStats(ctx context.Context, id string) (*types.StatsJSON, error) { st, err := s.docker.ContainerStats(ctx, id, false /*stream*/) if err != nil { return nil, fmt.Errorf("failed to retrieve stats: %v", err) } defer st.Body.Close() b, err := ioutil.ReadAll(st.Body) if err != nil { return nil, fmt.Errorf("failed to read stats: %v", err) } var stats types.StatsJSON if err = json.Unmarshal(b, &stats); err != nil { return nil, fmt.Errorf("failed to unmarshal stats JSON: %v", err) } return &stats, nil } func (s *scraper) usageStatsToMetrics(stats *types.StatsJSON, labelValues []*mpb.LabelValue) []*mpb.Metric { var rx, tx uint64 for _, nw := range stats.Networks { rx += nw.RxBytes tx += nw.TxBytes } return []*mpb.Metric{ { MetricDescriptor: cpuUsageDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeDoubleTimeSeries(time.Duration(stats.CPUStats.CPUUsage.TotalUsage).Seconds(), s.startTime, s.now(), labelValues), }, }, // Unfortunately, Docker API doesn't expose CPU Limits via CPUStats API. That information // is extracted via container inspection (see readInfo() below). { MetricDescriptor: memUsageDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(int64(stats.MemoryStats.Usage), s.startTime, s.now(), labelValues), }, }, { MetricDescriptor: memLimitDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(int64(stats.MemoryStats.Limit), s.startTime, s.now(), labelValues), }, }, { MetricDescriptor: nwRecvBytesDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(int64(rx), s.startTime, s.now(), labelValues), }, }, { MetricDescriptor: nwSentBytesDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(int64(tx), s.startTime, s.now(), labelValues), }, }, } } func (s *scraper) readContainerInfo(ctx context.Context, id string) (containerInfo, error) { var info containerInfo c, err := s.docker.ContainerInspect(ctx, id) if err != nil { return info, fmt.Errorf("failed to retrieve container info: %v", err) } info.restartCount = int64(c.RestartCount) info.cpuLimit = c.HostConfig.NanoCPUs t, err := time.Parse(time.RFC3339Nano, c.State.StartedAt) if err != nil { return info, fmt.Errorf("failed to parse container start time (%s): %v", c.State.StartedAt, err) } now := s.now() if t.After(now) { return info, fmt.Errorf("invalid container start time %v, should be <= current time %v", t, now) } info.uptime = now.Sub(t) return info, nil } func (s *scraper) containerInfoToMetrics(info containerInfo, labelValues []*mpb.LabelValue) []*mpb.Metric { metrics := []*mpb.Metric{ { MetricDescriptor: uptimeDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(int64(info.uptime.Seconds()), s.startTime, s.now(), labelValues), }, }, { MetricDescriptor: restartCountDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeInt64TimeSeries(info.restartCount, s.startTime, s.now(), labelValues), }, }, } if info.cpuLimit > 0 { // only generate if the container has a CPU limit. metrics = append(metrics, &mpb.Metric{ MetricDescriptor: cpuLimitDesc, Timeseries: []*mpb.TimeSeries{ metricgenerator.MakeDoubleTimeSeries(time.Duration(info.cpuLimit).Seconds(), s.startTime, s.now(), labelValues), }, }) } return metrics }