pkg/exporter/probe/procio/procio.go (103 lines of code) (raw):
package procio
import (
"context"
"fmt"
"io"
"os"
"github.com/prometheus/client_golang/prometheus"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
log "github.com/sirupsen/logrus"
"github.com/prometheus/procfs"
)
const (
IOReadSyscall = "readsyscall"
IOWriteSyscall = "writesyscall"
IOReadBytes = "readbytes"
IOWriteBytes = "writebytes"
probeName = "io" // nolint
)
func init() {
probe.MustRegisterMetricsProbe(probeName, ioProbeCreator)
}
func ioProbeCreator() (probe.MetricsProbe, error) {
p := &ProcIO{}
opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
VariableLabels: probe.StandardMetricsLabels,
SingleMetricsOpts: []probe.SingleMetricsOpts{
{Name: IOReadSyscall, Help: "The total number of read system calls made by the process", ValueType: prometheus.CounterValue},
{Name: IOWriteSyscall, Help: "The total number of write system calls made by the process", ValueType: prometheus.CounterValue},
{Name: IOReadBytes, Help: "The total number of bytes read by the process", ValueType: prometheus.CounterValue},
{Name: IOWriteBytes, Help: "The total number of bytes written by the process", ValueType: prometheus.CounterValue},
},
}
metrics := probe.NewBatchMetrics(opts, p.collectOnce)
return probe.NewMetricsProbe(probeName, p, metrics), nil
}
type ProcIO struct {
}
func (s *ProcIO) Start(_ context.Context) error {
return nil
}
func (s *ProcIO) Stop(_ context.Context) error {
return nil
}
func (s *ProcIO) collectOnce(emit probe.Emit) error {
ets := nettop.GetAllEntity()
if len(ets) == 0 {
log.Infof("procio: no entity found")
}
for _, entity := range ets {
collectProcessIO(entity, emit)
}
return nil
}
func collectProcessIO(entity *nettop.Entity, emit probe.Emit) {
var (
readSyscall uint64
writeSyscall uint64
readBytes uint64
writeBytes uint64
)
for _, pid := range entity.GetPids() {
iodata, err := getProcessIOStat(pid)
if err != nil {
log.Warningf("probe %s: failed get process io data: %v", probeName, err)
continue
}
readSyscall += iodata.SyscR
writeSyscall += iodata.SyscW
readBytes += iodata.ReadBytes
writeBytes += iodata.WriteBytes
}
labels := probe.BuildStandardMetricsLabelValues(entity)
emit(IOReadSyscall, labels, float64(readSyscall))
emit(IOWriteSyscall, labels, float64(writeSyscall))
emit(IOReadBytes, labels, float64(readBytes))
emit(IOWriteBytes, labels, float64(writeBytes))
}
// IO creates a new ProcIO instance from a given Proc instance.
func getProcessIOStat(pid int) (procfs.ProcIO, error) {
pio := procfs.ProcIO{}
data, err := readFileNoStat(fmt.Sprintf("/proc/%d/io", pid))
if err != nil {
return pio, err
}
ioFormat := "rchar: %d\nwchar: %d\nsyscr: %d\nsyscw: %d\n" +
"read_bytes: %d\nwrite_bytes: %d\n" +
"cancelled_write_bytes: %d\n"
_, err = fmt.Sscanf(string(data), ioFormat, &pio.RChar, &pio.WChar, &pio.SyscR,
&pio.SyscW, &pio.ReadBytes, &pio.WriteBytes, &pio.CancelledWriteBytes)
return pio, err
}
func readFileNoStat(filename string) ([]byte, error) {
const maxBufferSize = 1024 * 512
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
reader := io.LimitReader(f, maxBufferSize)
return io.ReadAll(reader)
}