metric/system/cgroup/cgv1/blkio.go (225 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package cgv1 import ( "bufio" "fmt" "os" "path/filepath" "strconv" "strings" "unicode" "github.com/elastic/elastic-agent-system-metrics/metric/system/cgroup/cgcommon" ) // BlockIOSubsystem contains limits and metrics from the "blkio" subsystem. The // blkio subsystem controls and monitors access to I/O on block devices by tasks // in a cgroup. // // https://www.kernel.org/doc/Documentation/cgroup-v1/blkio-controller.txt type BlockIOSubsystem struct { ID string `json:"id,omitempty"` // ID of the cgroup. Path string `json:"path,omitempty"` // Path to the cgroup relative to the cgroup subsystem's mountpoint. Total TotalIOs `json:"total,omitempty" struct:"total"` // Throttle limits for upper IO rates and metrics. Reads TotalIOs `json:"reads,omitempty" struct:"reads"` // Throttle limits for upper IO rates and metrics. Writes TotalIOs `json:"writes,omitempty" struct:"writes"` // Throttle limits for upper IO rates and metrics. //CFQ CFQScheduler `json:"cfq,omitempty"` // Completely fair queue scheduler limits and metrics. } // TotalIOs wraps the totals for blkio type TotalIOs struct { Bytes uint64 `json:"bytes,omitempty" struct:"bytes,omitempty"` Ios uint64 `json:"ios,omitempty" struct:"ios,omitempty"` } // CFQScheduler contains limits and metrics for the proportional weight time // based division of disk policy. It is implemented in CFQ. Hence this policy // takes effect only on leaf nodes when CFQ is being used. // // https://www.kernel.org/doc/Documentation/block/cfq-iosched.txt type CFQScheduler struct { Weight uint64 `json:"weight"` // Default weight for all devices unless overridden. Allowed range of weights is from 10 to 1000. Devices []CFQDevice `json:"devices,omitempty"` } // CFQDevice contains CFQ limits and metrics associated with a single device. type CFQDevice struct { DeviceID DeviceID `json:"device_id"` // ID of the device. // Proportional weight for the device. 0 means a per device weight is not set and // that the blkio.weight value is used. Weight uint64 `json:"weight"` TimeMs uint64 `json:"time_ms"` // Disk time allocated to cgroup per device in milliseconds. Sectors uint64 `json:"sectors"` // Number of sectors transferred to/from disk by the cgroup. Bytes OperationValues `json:"io_service_bytes"` // Number of bytes transferred to/from the disk by the cgroup. IOs OperationValues `json:"io_serviced"` // Number of IO operations issued to the disk by the cgroup. ServiceTimeNanos OperationValues `json:"io_service_time"` // Amount of time between request dispatch and request completion for the IOs done by this cgroup. WaitTimeNanos OperationValues `json:"io_wait_time"` // Amount of time the IOs for this cgroup spent waiting in the scheduler queues for service. Merges OperationValues `json:"io_merged"` // Total number of bios/requests merged into requests belonging to this cgroup. } // ThrottleDevice contains throttle limits and metrics associated with a single device. type ThrottleDevice struct { DeviceID DeviceID `json:"device_id"` // ID of the device. ReadLimitBPS uint64 `json:"read_bps_device"` // Read limit in bytes per second (BPS). Zero means no limit. WriteLimitBPS uint64 `json:"write_bps_device"` // Write limit in bytes per second (BPS). Zero mean no limit. ReadLimitIOPS uint64 `json:"read_iops_device"` // Read limit in IOPS. Zero means no limit. WriteLimitIOPS uint64 `json:"write_iops_device"` // Write limit in IOPS. Zero means no limit. Bytes OperationValues `json:"io_service_bytes"` // Number of bytes transferred to/from the disk by the cgroup. IOs OperationValues `json:"io_serviced"` // Number of IO operations issued to the disk by the cgroup. } // OperationValues contains the I/O limits or metrics associated with read, // write, sync, and async operations. type OperationValues struct { Read uint64 `json:"read"` Write uint64 `json:"write"` Async uint64 `json:"async"` Sync uint64 `json:"sync"` } // DeviceID identifies a Linux block device. type DeviceID struct { Major uint64 Minor uint64 } // blkioValue holds a single blkio value associated with a device. type blkioValue struct { DeviceID Operation string Value uint64 } // Get reads metrics from the "blkio" subsystem. path is the filepath to the // cgroup hierarchy to read. func (blkio *BlockIOSubsystem) Get(path string) error { if err := blkioThrottle(path, blkio); err != nil { return fmt.Errorf("error reading throttle data from %s: %w", path, err) } return nil } // blkioThrottle reads all of the limits and metrics associated with blkio // throttling policy. func blkioThrottle(path string, blkio *BlockIOSubsystem) error { devices := map[DeviceID]*ThrottleDevice{} getDevice := func(id DeviceID) *ThrottleDevice { td := devices[id] if td == nil { td = &ThrottleDevice{DeviceID: id} devices[id] = td } return td } values, err := readBlkioValues(path, "blkio.throttle.io_service_bytes") if err != nil { return fmt.Errorf("error reading blkio.throttle.io_service_bytes: %w", err) } if values != nil { for id, opValues := range collectOpValues(values) { getDevice(id).Bytes = *opValues } } values, err = readBlkioValues(path, "blkio.throttle.io_serviced") if err != nil { return fmt.Errorf("error reading blkio.throttle.io_serviced: %w", err) } if values != nil { for id, opValues := range collectOpValues(values) { getDevice(id).IOs = *opValues } } values, err = readBlkioValues(path, "blkio.throttle.read_bps_device") if err != nil { return fmt.Errorf("error reading blkio.throttle.read_bps_device: %w", err) } if len(values) != 0 { for _, bv := range values { getDevice(bv.DeviceID).ReadLimitBPS = bv.Value } } values, err = readBlkioValues(path, "blkio.throttle.write_bps_device") if err != nil { return fmt.Errorf("error reading blkio.throttle.write_bps_device: %w", err) } if len(values) != 0 { for _, bv := range values { getDevice(bv.DeviceID).WriteLimitBPS = bv.Value } } values, err = readBlkioValues(path, "blkio.throttle.read_iops_device") if err != nil { return fmt.Errorf("error reading blkio.throttle.read_iops_device: %w", err) } if len(values) != 0 { for _, bv := range values { getDevice(bv.DeviceID).ReadLimitIOPS = bv.Value } } values, err = readBlkioValues(path, "blkio.throttle.write_iops_device") if err != nil { return fmt.Errorf("error reading blkio.throttle.write_iops_device: %w", err) } if len(values) != 0 { for _, bv := range values { getDevice(bv.DeviceID).WriteLimitIOPS = bv.Value } } for _, dev := range devices { blkio.Total.Bytes += dev.Bytes.Read + dev.Bytes.Write blkio.Total.Ios += dev.IOs.Read + dev.IOs.Write blkio.Reads.Bytes += dev.Bytes.Read blkio.Reads.Ios += dev.IOs.Read blkio.Writes.Bytes += dev.Bytes.Write blkio.Writes.Ios += dev.IOs.Write } return nil } // collectOpValues collects the discreet I/O values (e.g. read, write, sync, // async) for a given device into a single OperationValues object. It returns a // mapping of device ID to OperationValues. func collectOpValues(values []blkioValue) map[DeviceID]*OperationValues { opValues := map[DeviceID]*OperationValues{} for _, bv := range values { opValue := opValues[bv.DeviceID] if opValue == nil { opValue = &OperationValues{} opValues[bv.DeviceID] = opValue } switch bv.Operation { case "read": opValue.Read = bv.Value case "write": opValue.Write = bv.Value case "async": opValue.Async = bv.Value case "sync": opValue.Sync = bv.Value } } return opValues } // readDeviceValues reads values from a single blkio file. // It expects to read values like "245:1 read 18880" or "254:1 1909". It returns // an array containing an entry for each valid line read. func readBlkioValues(path ...string) ([]blkioValue, error) { f, err := os.Open(filepath.Join(path...)) if err != nil { if os.IsNotExist(err) { return nil, nil } return nil, err } defer f.Close() var values []blkioValue sc := bufio.NewScanner(f) for sc.Scan() { line := strings.TrimSpace(sc.Text()) if len(line) == 0 { continue } // Valid lines start with a device ID. if !unicode.IsNumber(rune(line[0])) { continue } v, err := parseBlkioValue(sc.Text()) if err != nil { return nil, err } values = append(values, v) } return values, sc.Err() } func isColonOrSpace(r rune) bool { return unicode.IsSpace(r) || r == ':' } func parseBlkioValue(line string) (blkioValue, error) { fields := strings.FieldsFunc(line, isColonOrSpace) if len(fields) != 3 && len(fields) != 4 { return blkioValue{}, cgcommon.ErrInvalidFormat } major, err := strconv.ParseUint(fields[0], 10, 64) if err != nil { return blkioValue{}, err } minor, err := strconv.ParseUint(fields[1], 10, 64) if err != nil { return blkioValue{}, err } var value uint64 var operation string if len(fields) == 3 { value, err = cgcommon.ParseUint([]byte(fields[2])) if err != nil { return blkioValue{}, err } } else { operation = strings.ToLower(fields[2]) value, err = cgcommon.ParseUint([]byte(fields[3])) if err != nil { return blkioValue{}, err } } return blkioValue{ DeviceID: DeviceID{major, minor}, Operation: operation, Value: value, }, nil }