pkg/exporter/probe/procsnmp/procsnmp.go (251 lines of code) (raw):
package procsnmp
import (
"bufio"
"context"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
log "github.com/sirupsen/logrus"
)
const (
ProtocolICMP = "Icmp"
ProtocolICMPMsg = "IcmpMsg"
ProtocolIP = "Ip"
ProtocolIPExt = "IpExt"
ProtocolMPTCPExt = "MPTcpExt"
ProtocolTCP = "Tcp"
ProtocolTCPExt = "TcpExt"
ProtocolUDP = "Udp"
ProtocolUDPLite = "UdpLite"
// metrics of tcp
TCPActiveOpens = "activeopens"
TCPPassiveOpens = "passiveopens"
TCPRetransSegs = "retranssegs"
TCPListenDrops = "listendrops"
TCPListenOverflows = "listenoverflows"
TCPSynRetrans = "tcpsynretrans"
TCPFastRetrans = "tcpfastretrans"
TCPRetransFail = "tcpretransfail"
TCPTimeouts = "tcptimeouts"
TCPAttemptFails = "attemptfails"
TCPEstabResets = "estabresets"
TCPCurrEstab = "currestab"
TCPInSegs = "insegs"
TCPOutSegs = "outsegs"
TCPInErrs = "inerrs"
TCPOutRsts = "outrsts"
// metrics of udp
UDPInDatagrams = "indatagrams"
UDPNoPorts = "noports"
UDPInErrors = "inerrors"
UDPOutDatagrams = "outdatagrams"
UDPRcvbufErrors = "rcvbuferrors"
UDPSndbufErrors = "sndbuferrors"
UDPInCsumErrors = "incsumerrors"
UDPIgnoredMulti = "ignoredmulti"
//metrics of ip
IPForwarding = "forwarding"
IPDefaultTTL = "defaultttl"
IPInReceives = "inreceives"
IPInHdrErrors = "inhdrerrors"
IPInAddrErrors = "inaddrerrors"
IPForwDatagrams = "forwdatagrams"
IPInUnknownProtos = "inunknownprotos"
IPInDiscards = "indiscards"
IPInDelivers = "indelivers"
IPOutRequests = "outrequests"
IPOutDiscards = "outdiscards"
IPOutNoRoutes = "outnoroutes"
IPReasmTimeout = "reasmtimeout"
IPReasmReqds = "reasmreqds"
IPReasmOKs = "reasmoks"
IPReasmFails = "reasmfails"
IPFragOKs = "fragoks"
IPFragFails = "fragfails"
IPFragCreates = "fragcreates"
TCP = "tcp"
UDP = "udp"
IP = "ip"
)
var (
TCPStatMetrcis = []probe.LegacyMetric{
{Name: TCPActiveOpens, Help: "The number of active TCP connections opened."},
{Name: TCPPassiveOpens, Help: "The number of passive TCP connections opened (i.e., connections established by accepting incoming connections)."},
{Name: TCPRetransSegs, Help: "The total number of segments that have been retransmitted."},
{Name: TCPAttemptFails, Help: "The number of failed attempts to establish a TCP connection."},
{Name: TCPEstabResets, Help: "The number of established TCP connections that were reset."},
{Name: TCPCurrEstab, Help: "The current number of established TCP connections."},
{Name: TCPInSegs, Help: "The total number of TCP segments received."},
{Name: TCPOutSegs, Help: "The total number of TCP segments sent."},
{Name: TCPInErrs, Help: "The total number of erroneous packets received on TCP."},
{Name: TCPOutRsts, Help: "The total number of TCP segments sent with the RST flag set."},
}
UDPStatMetrics = []probe.LegacyMetric{
{Name: UDPInDatagrams, Help: "The total number of UDP datagrams received."},
{Name: UDPNoPorts, Help: "The total number of UDP datagrams received for which there was no port at the destination."},
{Name: UDPInErrors, Help: "The total number of erroneous received UDP packets."},
{Name: UDPOutDatagrams, Help: "The total number of UDP datagrams sent."},
{Name: UDPRcvbufErrors, Help: "The total number of UDP datagrams dropped due to socket receive buffer errors."},
{Name: UDPSndbufErrors, Help: "The total number of UDP datagrams dropped due to socket send buffer errors."},
{Name: UDPInCsumErrors, Help: "The total number of UDP datagrams received with a checksum error."},
{Name: UDPIgnoredMulti, Help: "The total number of received UDP multicast packets that were ignored."},
}
IPMetrics = []probe.LegacyMetric{
{Name: IPForwarding, Help: "Indicates whether IP forwarding is enabled (1 for enabled, 0 for disabled)."},
{Name: IPDefaultTTL, Help: "The default time-to-live (TTL) value for IP packets."},
{Name: IPInReceives, Help: "The total number of IP packets received."},
{Name: IPInHdrErrors, Help: "The total number of received IP packets that had a header error."},
{Name: IPInAddrErrors, Help: "The total number of received IP packets that were discarded due to address errors."},
{Name: IPForwDatagrams, Help: "The total number of IP packets forwarded by this machine."},
{Name: IPInUnknownProtos, Help: "The total number of received IP packets for which the protocol is not known."},
{Name: IPInDiscards, Help: "The total number of received IP packets that were discarded."},
{Name: IPInDelivers, Help: "The total number of delivered IP packets."},
{Name: IPOutRequests, Help: "The total number of IP packets sent out."},
{Name: IPOutDiscards, Help: "The total number of outgoing IP packets that were discarded."},
{Name: IPOutNoRoutes, Help: "The total number of outgoing IP packets for which no route could be found."},
{Name: IPReasmTimeout, Help: "The total number of times that IP reassembly timed out."},
{Name: IPReasmReqds, Help: "The total number of IP reassembly requests made."},
{Name: IPReasmOKs, Help: "The total number of successful IP reassembly operations."},
{Name: IPReasmFails, Help: "The total number of failed IP reassembly operations."},
{Name: IPFragOKs, Help: "The total number of IP packets that were fragmented successfully."},
{Name: IPFragFails, Help: "The total number of IP packets that failed to fragment."},
{Name: IPFragCreates, Help: "The total number of IP fragments created."},
}
metricsMap = map[string][]probe.LegacyMetric{
TCP: TCPStatMetrcis,
UDP: UDPStatMetrics,
IP: IPMetrics,
}
cache = &snmpCache{
cache: make(map[string]map[string]map[uint32]uint64),
}
)
func init() {
probe.MustRegisterMetricsProbe(TCP, newSnmpProbeCreator(TCP))
probe.MustRegisterMetricsProbe(UDP, newSnmpProbeCreator(UDP))
probe.MustRegisterMetricsProbe(IP, newSnmpProbeCreator(IP))
}
func newSnmpProbeCreator(probeName string) func() (probe.MetricsProbe, error) {
return func() (probe.MetricsProbe, error) {
p := &procSNMP{
name: probeName,
}
metrics := metricsMap[probeName]
batchMetrics := probe.NewLegacyBatchMetrics(probeName, metrics, p.CollectOnce)
return probe.NewMetricsProbe(probeName, p, batchMetrics), nil
}
}
type procSNMP struct {
name string
}
func (s *procSNMP) Start(_ context.Context) error {
return nil
}
func (s *procSNMP) Stop(_ context.Context) error {
return nil
}
func (s *procSNMP) CollectOnce() (map[string]map[uint32]uint64, error) {
return cache.get(s.name)
}
type snmpCache struct {
cache map[string]map[string]map[uint32]uint64
err error
last time.Time
lock sync.Mutex
}
func (c *snmpCache) get(name string) (map[string]map[uint32]uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()
if c.err != nil {
return nil, c.err
}
if time.Since(c.last) > time.Second*2 {
c.reload()
}
return c.cache[name], nil
}
func (c *snmpCache) reload() {
c.cache, c.err = collect()
c.last = time.Now()
}
func collect() (map[string]map[string]map[uint32]uint64, error) {
entitys := nettop.GetAllUniqueNetnsEntity()
res := make(map[string]map[string]map[uint32]uint64)
for proto, metricsList := range metricsMap {
res[proto] = make(map[string]map[uint32]uint64)
for _, metrics := range metricsList {
res[proto][metrics.Name] = make(map[uint32]uint64)
}
}
for _, et := range entitys {
if et != nil {
pid := et.GetPid()
nsinum := et.GetNetns()
stats, err := getNetstatByPid(pid)
if err != nil {
log.Errorf("%s failed get netstat, pid: %d, nsinum: %d, err: %v", "snmp", pid, nsinum, err)
continue
}
for proto, stat := range stats {
for k, v := range stat {
data, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Errorf("%s failed parse netstat value, pid: %d, nsinum: %d, key: %s value: %s, err: %v", "snmp", pid, nsinum, k, v, err)
continue
}
// ignore unaware metric
if _, ok := res[proto][k]; ok {
res[proto][k][uint32(nsinum)] = uint64(data)
}
}
}
}
}
return res, nil
}
func getNetstatByPid(pid int) (map[string]map[string]string, error) {
resMap := make(map[string]map[string]string)
snmppath := fmt.Sprintf("/proc/%d/net/snmp", pid)
if _, err := os.Stat(snmppath); os.IsNotExist(err) {
return resMap, err
}
snmpStats, err := getNetStats(snmppath)
if err != nil {
return resMap, err
}
for k, v := range snmpStats {
resMap[k] = v
}
return resMap, nil
}
func getNetStats(fileName string) (map[string]map[string]string, error) {
file, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer file.Close()
return parseNetStats(file, fileName)
}
func parseNetStats(r io.Reader, fileName string) (map[string]map[string]string, error) {
var (
netStats = map[string]map[string]string{}
scanner = bufio.NewScanner(r)
)
for scanner.Scan() {
nameParts := strings.Split(scanner.Text(), " ")
scanner.Scan()
valueParts := strings.Split(scanner.Text(), " ")
// Remove trailing :.
protocol := strings.ToLower(nameParts[0][:len(nameParts[0])-1])
netStats[protocol] = map[string]string{}
if len(nameParts) != len(valueParts) {
return nil, fmt.Errorf("mismatch field count mismatch in %s: %s",
fileName, protocol)
}
for i := 1; i < len(nameParts); i++ {
netStats[protocol][strings.ToLower(nameParts[i])] = valueParts[i]
}
}
return netStats, scanner.Err()
}