pkg/exporter/probe/procsock/procsock.go (224 lines of code) (raw):
package procsock
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"io"
"os"
"strconv"
"strings"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/prometheus/procfs"
)
const (
TCPSockInuse = "inuse"
TCPSockOrphan = "orphan"
TCPSockTimewait = "tw"
TCPSockeAlloc = "alloc"
TCPSockeMem = "mem"
)
var (
TCPSockStatMetrics = []probe.LegacyMetric{
{Name: TCPSockInuse, Help: "The total number of TCP sockets currently in use."},
{Name: TCPSockOrphan, Help: "The total number of orphaned TCP sockets."},
{Name: TCPSockTimewait, Help: "The total number of TCP sockets in the TIME_WAIT state."},
{Name: TCPSockeAlloc, Help: "The total number of TCP sockets allocated."},
{Name: TCPSockeMem, Help: "The total amount of memory allocated for TCP sockets."},
}
probeName = "sock"
)
func init() {
probe.MustRegisterMetricsProbe(probeName, sockProbeCreator)
}
func sockProbeCreator() (probe.MetricsProbe, error) {
p := &ProcSock{}
batchMetrics := probe.NewLegacyBatchMetrics(probeName, TCPSockStatMetrics, p.CollectOnce)
return probe.NewMetricsProbe(probeName, p, batchMetrics), nil
}
func (s *ProcSock) CollectOnce() (map[string]map[uint32]uint64, error) {
return collect()
}
type ProcSock struct {
}
func (s *ProcSock) Start(_ context.Context) error {
return nil
}
func (s *ProcSock) Stop(_ context.Context) error {
return nil
}
type tcpsockstat struct {
InUse int
Orphan int
TW int
Alloc int
Mem int
}
func collect() (resMap map[string]map[uint32]uint64, err error) {
resMap = make(map[string]map[uint32]uint64)
for _, stat := range TCPSockStatMetrics {
resMap[stat.Name] = map[uint32]uint64{}
}
// for _, nslogic := range nslist {
// skstat, err := getTcpSockstatByPid(uint32(nslogic.GetPid()))
// if err != nil {
// continue
// }
// nsinum := uint32(nslogic.GetNetns())
// resMap[metricUniqueID("sock", TCPSockInuse)][nsinum] = uint64(skstat.InUse)
// resMap[metricUniqueID("sock", TCPSockOrphan)][nsinum] = uint64(skstat.Orphan)
// resMap[metricUniqueID("sock", TCPSockTimewait)][nsinum] = uint64(skstat.TW)
// resMap[metricUniqueID("sock", TCPSockeAlloc)][nsinum] = uint64(skstat.Alloc)
// resMap[metricUniqueID("sock", TCPSockeMem)][nsinum] = uint64(skstat.Mem)
// }
skstat, err := getHostTCPSockstat()
if err != nil {
return resMap, err
}
nsinum := uint32(nettop.InitNetns)
resMap[TCPSockInuse][nsinum] = uint64(skstat.InUse)
resMap[TCPSockOrphan][nsinum] = uint64(skstat.Orphan)
resMap[TCPSockTimewait][nsinum] = uint64(skstat.TW)
resMap[TCPSockeAlloc][nsinum] = uint64(skstat.Alloc)
resMap[TCPSockeMem][nsinum] = uint64(skstat.Mem)
return
}
// getProcessTcpSockstat only fetch a process in an netns, just want tcp info
// func getTcpSockstatByPid(pid uint32) (tcpsockstat, error) {
// res := tcpsockstat{}
// data, err := ReadFileNoStat(fmt.Sprintf("/proc/%d/net/sockstat", pid))
// if err != nil {
// return res, err
// }
// stat, err := parseSockstat(bytes.NewReader(data))
// if err != nil {
// return res, err
// }
// for idx := range stat.Protocols {
// if strings.Compare(stat.Protocols[idx].Protocol, "TCP") == 0 {
// res.InUse = stat.Protocols[idx].InUse
// res.Orphan = *stat.Protocols[idx].Orphan
// res.Alloc = *stat.Protocols[idx].Alloc
// res.TW = *stat.Protocols[idx].TW
// res.Mem = *stat.Protocols[idx].Mem
// }
// }
// data6, err := ReadFileNoStat(fmt.Sprintf("/proc/%d/net/sockstat6", pid))
// if err != nil {
// // if ipv6 stat not available, use ipv4 data directly
// return res, nil
// }
// stat6, err := parseSockstat(bytes.NewReader(data6))
// if err != nil {
// return res, nil
// }
// for idx := range stat6.Protocols {
// if strings.Compare(stat.Protocols[idx].Protocol, "TCP") == 0 {
// res.InUse += stat.Protocols[idx].InUse
// res.Orphan += *stat.Protocols[idx].Orphan
// res.Alloc += *stat.Protocols[idx].Alloc
// res.TW += *stat.Protocols[idx].TW
// res.Mem += *stat.Protocols[idx].Mem
// }
// }
// return res, nil
// }
func getHostTCPSockstat() (tcpsockstat, error) {
res := tcpsockstat{}
data, err := ReadFileNoStat("/proc/net/sockstat")
if err != nil {
return res, err
}
stat, err := parseSockstat(bytes.NewReader(data))
if err != nil {
return res, err
}
for idx := range stat.Protocols {
if strings.Compare(stat.Protocols[idx].Protocol, "TCP") == 0 {
res.InUse = stat.Protocols[idx].InUse
res.Orphan = *stat.Protocols[idx].Orphan
res.Alloc = *stat.Protocols[idx].Alloc
res.TW = *stat.Protocols[idx].TW
res.Mem = *stat.Protocols[idx].Mem
}
}
data6, err := ReadFileNoStat("/proc/net/sockstat6")
if err != nil {
// if ipv6 stat not available, use ipv4 data directly
return res, nil
}
stat6, err := parseSockstat(bytes.NewReader(data6))
if err != nil {
return res, nil
}
for idx := range stat6.Protocols {
if strings.Compare(stat6.Protocols[idx].Protocol, "TCP6") == 0 {
res.InUse += stat6.Protocols[idx].InUse
}
}
return res, nil
}
// parseSockstat reads the contents of a sockstat file and parses a NetSockstat.
func parseSockstat(r io.Reader) (*procfs.NetSockstat, error) {
var stat procfs.NetSockstat
s := bufio.NewScanner(r)
for s.Scan() {
// Expect a minimum of a protocol and one key/value pair.
fields := strings.Split(s.Text(), " ")
if len(fields) < 3 {
return nil, fmt.Errorf("malformed sockstat line: %q", s.Text())
}
// The remaining fields are key/value pairs.
kvs, err := parseSockstatKVs(fields[1:])
if err != nil {
return nil, fmt.Errorf("error parsing sockstat key/value pairs from %q: %w", s.Text(), err)
}
// The first field is the protocol. We must trim its colon suffix.
proto := strings.TrimSuffix(fields[0], ":")
switch proto {
case "sockets":
// Special case: IPv4 has a sockets "used" key/value pair that we
// embed at the top level of the structure.
used := kvs["used"]
stat.Used = &used
default:
// Parse all other lines as individual protocols.
nsp := parseSockstatProtocol(kvs)
nsp.Protocol = proto
stat.Protocols = append(stat.Protocols, nsp)
}
}
if err := s.Err(); err != nil {
return nil, err
}
return &stat, nil
}
// parseSockstatKVs parses a string slice into a map of key/value pairs.
func parseSockstatKVs(kvs []string) (map[string]int, error) {
if len(kvs)%2 != 0 {
return nil, errors.New("odd number of fields in key/value pairs")
}
// Iterate two values at a time to gather key/value pairs.
out := make(map[string]int, len(kvs)/2)
for i := 0; i < len(kvs); i += 2 {
vp := NewValueParser(kvs[i+1])
out[kvs[i]] = vp.Int()
if err := vp.Err(); err != nil {
return nil, err
}
}
return out, nil
}
// parseSockstatProtocol parses a NetSockstatProtocol from the input kvs map.
func parseSockstatProtocol(kvs map[string]int) procfs.NetSockstatProtocol {
var nsp procfs.NetSockstatProtocol
for k, v := range kvs {
// Capture the range variable to ensure we get unique pointers for
// each of the optional fields.
v := v
switch k {
case "inuse":
nsp.InUse = v
case "orphan":
nsp.Orphan = &v
case "tw":
nsp.TW = &v
case "alloc":
nsp.Alloc = &v
case "mem":
nsp.Mem = &v
case "memory":
nsp.Memory = &v
}
}
return nsp
}
type ValueParser struct {
v string
err error
}
// NewValueParser creates a ValueParser using the input string.
func NewValueParser(v string) *ValueParser {
return &ValueParser{v: v}
}
// Int interprets the underlying value as an int and returns that value.
func (vp *ValueParser) Int() int { return int(vp.int64()) }
// PInt64 interprets the underlying value as an int64 and returns a pointer to
// that value.
func (vp *ValueParser) PInt64() *int64 {
if vp.err != nil {
return nil
}
v := vp.int64()
return &v
}
// int64 interprets the underlying value as an int64 and returns that value.
// TODO: export if/when necessary.
func (vp *ValueParser) int64() int64 {
if vp.err != nil {
return 0
}
// A base value of zero makes ParseInt infer the correct base using the
// string's prefix, if any.
const base = 0
v, err := strconv.ParseInt(vp.v, base, 64)
if err != nil {
vp.err = err
return 0
}
return v
}
// PUInt64 interprets the underlying value as an uint64 and returns a pointer to
// that value.
func (vp *ValueParser) PUInt64() *uint64 {
if vp.err != nil {
return nil
}
// A base value of zero makes ParseInt infer the correct base using the
// string's prefix, if any.
const base = 0
v, err := strconv.ParseUint(vp.v, base, 64)
if err != nil {
vp.err = err
return nil
}
return &v
}
// Err returns the last error, if any, encountered by the ValueParser.
func (vp *ValueParser) Err() error {
return vp.err
}
func ReadFileNoStat(filename string) ([]byte, error) {
const maxBufferSize = 1024 * 512
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
reader := io.LimitReader(f, maxBufferSize)
return io.ReadAll(reader)
}