metricbeat/module/docker/diskio/helper.go (159 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package diskio
import (
"strings"
"time"
"github.com/docker/docker/api/types/container"
"github.com/elastic/beats/v7/metricbeat/module/docker"
)
// BlkioStats contains all formatted blkio stats
type BlkioStats struct {
Time time.Time
Container *docker.Container
reads float64
writes float64
totals float64
serviced BlkioRaw
servicedBytes BlkioRaw
servicedTime BlkioRaw
waitTime BlkioRaw
queued BlkioRaw
}
// Add adds blkio stats
func (s *BlkioStats) Add(o *BlkioStats) {
s.reads += o.reads
s.writes += o.writes
s.totals += o.totals
s.serviced.Add(&o.serviced)
s.servicedBytes.Add(&o.servicedBytes)
}
// BlkioRaw sums raw Blkio stats
type BlkioRaw struct {
Time time.Time
reads uint64
writes uint64
totals uint64
}
// Add adds blkio raw stats
func (s *BlkioRaw) Add(o *BlkioRaw) {
s.reads += o.reads
s.writes += o.writes
s.totals += o.totals
}
// BlkioService is a helper to collect and calculate disk I/O metrics
type BlkioService struct {
lastStatsPerContainer map[string]BlkioRaw
}
// NewBlkioService builds a new initialized BlkioService
func NewBlkioService() *BlkioService {
return &BlkioService{
lastStatsPerContainer: make(map[string]BlkioRaw),
}
}
func (io *BlkioService) getBlkioStatsList(rawStats []docker.Stat, dedot bool, skipDev []uint64) []BlkioStats {
formattedStats := []BlkioStats{}
statsPerContainer := make(map[string]BlkioRaw)
for i := range rawStats {
stats := io.getBlkioStats(&rawStats[i], dedot, skipDev)
storageStats := io.getStorageStats(&rawStats[i], dedot)
stats.Add(&storageStats)
oldStats, exist := io.lastStatsPerContainer[stats.Container.ID]
if exist {
stats.reads = io.getReadPs(&oldStats, &stats.serviced)
stats.writes = io.getWritePs(&oldStats, &stats.serviced)
stats.totals = io.getTotalPs(&oldStats, &stats.serviced)
}
statsPerContainer[stats.Container.ID] = stats.serviced
formattedStats = append(formattedStats, stats)
}
io.lastStatsPerContainer = statsPerContainer
return formattedStats
}
// getStorageStats collects diskio metrics from StorageStats structure, that
// is populated in Windows systems only
func (io *BlkioService) getStorageStats(myRawStats *docker.Stat, dedot bool) BlkioStats {
return BlkioStats{
Time: myRawStats.Stats.Read,
Container: docker.NewContainer(myRawStats.Container, dedot),
serviced: BlkioRaw{
reads: myRawStats.Stats.StorageStats.ReadCountNormalized,
writes: myRawStats.Stats.StorageStats.WriteCountNormalized,
totals: myRawStats.Stats.StorageStats.ReadCountNormalized + myRawStats.Stats.StorageStats.WriteCountNormalized,
},
servicedBytes: BlkioRaw{
reads: myRawStats.Stats.StorageStats.ReadSizeBytes,
writes: myRawStats.Stats.StorageStats.WriteSizeBytes,
totals: myRawStats.Stats.StorageStats.ReadSizeBytes + myRawStats.Stats.StorageStats.WriteSizeBytes,
},
}
}
// getBlkioStats collects diskio metrics from BlkioStats structures, that
// are not populated in Windows
func (io *BlkioService) getBlkioStats(myRawStat *docker.Stat, dedot bool, skipDev []uint64) BlkioStats {
return BlkioStats{
Time: myRawStat.Stats.Read,
Container: docker.NewContainer(myRawStat.Container, dedot),
serviced: getNewStats(
skipDev,
myRawStat.Stats.Read,
myRawStat.Stats.BlkioStats.IoServicedRecursive),
servicedBytes: getNewStats(
skipDev,
myRawStat.Stats.Read,
myRawStat.Stats.BlkioStats.IoServiceBytesRecursive),
servicedTime: getNewStats(
skipDev,
myRawStat.Stats.Read,
myRawStat.Stats.BlkioStats.IoServiceTimeRecursive),
waitTime: getNewStats(
skipDev,
myRawStat.Stats.Read,
myRawStat.Stats.BlkioStats.IoWaitTimeRecursive),
queued: getNewStats(
skipDev,
myRawStat.Stats.Read,
myRawStat.Stats.BlkioStats.IoQueuedRecursive),
}
}
func getNewStats(skip []uint64, time time.Time, blkioEntry []container.BlkioStatEntry) BlkioRaw {
stats := BlkioRaw{
Time: time,
reads: 0,
writes: 0,
totals: 0,
}
for _, myEntry := range blkioEntry {
// certain devices, like software raid and device-mapper devices, will just control and re-report the disks
// under them in the hierarchy. We want to skip them, lest we merely duplicate the metrics.
if skipDev(myEntry.Major, skip) {
continue
}
// These op value strings can either be Capitalized or lowercase, depending on the platform.
switch strings.ToLower(myEntry.Op) {
case "write":
stats.writes += myEntry.Value
case "read":
stats.reads += myEntry.Value
case "total":
stats.totals += myEntry.Value
}
}
return stats
}
func skipDev(major uint64, skipList []uint64) bool {
for _, dev := range skipList {
if major == dev {
return true
}
}
return false
}
func (io *BlkioService) getReadPs(old *BlkioRaw, new *BlkioRaw) float64 {
duration := new.Time.Sub(old.Time)
return calculatePerSecond(duration, old.reads, new.reads)
}
func (io *BlkioService) getWritePs(old *BlkioRaw, new *BlkioRaw) float64 {
duration := new.Time.Sub(old.Time)
return calculatePerSecond(duration, old.writes, new.writes)
}
func (io *BlkioService) getTotalPs(old *BlkioRaw, new *BlkioRaw) float64 {
duration := new.Time.Sub(old.Time)
return calculatePerSecond(duration, old.totals, new.totals)
}
func calculatePerSecond(duration time.Duration, old uint64, new uint64) float64 {
value := float64(new) - float64(old)
if value < 0 {
value = 0
}
timeSec := duration.Seconds()
if timeSec == 0 {
return 0
}
return value / timeSec
}