metric/system/diskio/diskstat_linux.go (96 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build linux
// +build linux
package diskio
import (
"errors"
"math"
"github.com/shirou/gopsutil/v4/disk"
"github.com/elastic/elastic-agent-system-metrics/metric"
"github.com/elastic/elastic-agent-system-metrics/metric/system/numcpu"
)
// GetCLKTCK emulates the _SC_CLK_TCK syscall
func GetCLKTCK() uint32 {
// return uint32(C.sysconf(C._SC_CLK_TCK))
// NOTE: _SC_CLK_TCK should be fetched from sysconf using cgo
return uint32(100)
}
// IOCounters should map functionality to disk package for linux os.
func IOCounters(names ...string) (map[string]disk.IOCountersStat, error) {
return disk.IOCounters(names...)
}
// NewDiskIOStat :init DiskIOStat object.
func NewDiskIOStat() *IOStat {
return &IOStat{
lastDiskIOCounters: map[string]disk.IOCountersStat{},
}
}
// OpenSampling creates current cpu sampling
// need call as soon as get IOCounters.
func (stat *IOStat) OpenSampling() error {
return stat.curCPU.Get()
}
// a few of the diskio counters are actually 32-bit on the kernel side, which means they can roll over fairly easily.
// Here we try to reconstruct the values by calculating the pre-rollover delta from unt32 max, then adding.
// If you want to get technical, this could be a tad unsafe, as we don't actually have any way of knowing if the word size changes in a future kernel, and we've rolled over at UINT64_MAX
// See https://docs.kernel.org/admin-guide/iostats.html and https://github.com/torvalds/linux/blob/master/block/genhd.c diskstats_show()
func returnOrFix32BitRollover(current, prev uint64) uint64 {
var maxUint32 uint64 = math.MaxUint32 //4_294_967_295 Max value in uint32/unsigned int
if current >= prev {
return current - prev
}
// we're at a uint64 if we hit this
if prev > maxUint32 {
return 0
}
delta := maxUint32 - prev
return delta + current
}
// CalcIOStatistics calculates IO statistics.
func (stat *IOStat) CalcIOStatistics(counter disk.IOCountersStat) (IOMetric, error) {
var last disk.IOCountersStat
var ok bool
// if last counter not found, create one and return all 0
if last, ok = stat.lastDiskIOCounters[counter.Name]; !ok {
stat.lastDiskIOCounters[counter.Name] = counter
return IOMetric{}, nil
}
// calculate the delta ms between the CloseSampling and OpenSampling
deltams := 1000.0 * float64(stat.curCPU.Total()-stat.lastCPU.Total()) / float64(numcpu.NumCPU()) / float64(GetCLKTCK())
if deltams <= 0 {
return IOMetric{}, errors.New("the delta cpu time between close sampling and open sampling is less or equal to 0")
}
rdIOs := counter.ReadCount - last.ReadCount
rdMerges := counter.MergedReadCount - last.MergedReadCount
rdBytes := counter.ReadBytes - last.ReadBytes
rdTicks := returnOrFix32BitRollover(counter.ReadTime, last.ReadTime)
wrIOs := counter.WriteCount - last.WriteCount
wrMerges := counter.MergedWriteCount - last.MergedWriteCount
wrBytes := counter.WriteBytes - last.WriteBytes
wrTicks := returnOrFix32BitRollover(counter.WriteTime, last.WriteTime)
ticks := returnOrFix32BitRollover(counter.IoTime, last.IoTime)
aveq := returnOrFix32BitRollover(counter.WeightedIO, last.WeightedIO)
nIOs := rdIOs + wrIOs
nTicks := rdTicks + wrTicks
nBytes := rdBytes + wrBytes
size := float64(0)
wait := float64(0)
svct := float64(0)
if nIOs > 0 {
size = float64(nBytes) / float64(nIOs)
wait = float64(nTicks) / float64(nIOs)
svct = float64(ticks) / float64(nIOs)
}
queue := float64(aveq) / deltams
perSec := func(x uint64) float64 {
return metric.Round(1000.0 * float64(x) / deltams)
}
result := IOMetric{}
result.ReadRequestMergeCountPerSec = perSec(rdMerges)
result.WriteRequestMergeCountPerSec = perSec(wrMerges)
result.ReadRequestCountPerSec = perSec(rdIOs)
result.WriteRequestCountPerSec = perSec(wrIOs)
result.ReadBytesPerSec = perSec(rdBytes)
result.WriteBytesPerSec = perSec(wrBytes)
result.AvgRequestSize = metric.Round(size)
result.AvgQueueSize = metric.Round(queue)
result.AvgAwaitTime = metric.Round(wait)
if rdIOs > 0 {
result.AvgReadAwaitTime = metric.Round(float64(rdTicks) / float64(rdIOs))
}
if wrIOs > 0 {
result.AvgWriteAwaitTime = metric.Round(float64(wrTicks) / float64(wrIOs))
}
result.AvgServiceTime = metric.Round(svct)
result.BusyPct = metric.Round(100.0 * float64(ticks) / deltams)
if result.BusyPct > 100.0 {
result.BusyPct = 100.0
}
stat.lastDiskIOCounters[counter.Name] = counter
return result, nil
}
// CloseSampling closes the disk sampler
func (stat *IOStat) CloseSampling() {
stat.lastCPU = stat.curCPU
}