pkg/exporter/probe/nlqdisc/nlqdiscstats.go (304 lines of code) (raw):

package nlqdisc import ( "context" "errors" "fmt" "math" "net" "strings" "syscall" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/mdlayher/netlink" "github.com/mdlayher/netlink/nlenc" log "github.com/sirupsen/logrus" ) const ( // nolint TCAUnspec = iota TCAKind TCAOptions TCAStats TCAXStats TCARate TCAFcnt TCAStats2 TCAStab // __TCA_MAX TCAStatsUnspec = iota TCAStatsBasic TCAStatsRateEst TCAStatsQueue TCAStatsApp TCAStatsRateEst64 // __TCAStats_MAX familyRoute = 0 ) var ( probeName = "qdisc" Bytes = "bytes" Packets = "packets" Drops = "drops" Qlen = "qlen" Backlog = "backlog" Overlimits = "overlimits" qdiscMetrics = []probe.LegacyMetric{ {Name: Bytes, Help: "The total number of bytes transmitted through the queuing discipline."}, {Name: Packets, Help: "The total number of packets transmitted through the queuing discipline."}, {Name: Drops, Help: "The total number of packets dropped by the queuing discipline."}, {Name: Qlen, Help: "The current length of the queue (the number of packets queued)."}, {Name: Backlog, Help: "The total amount of data currently in the queue (in bytes)."}, {Name: Overlimits, Help: "The total number of packets that exceeded the configured limits."}, } ) func init() { probe.MustRegisterMetricsProbe(probeName, qdiscProbeCreator) } func qdiscProbeCreator() (probe.MetricsProbe, error) { p := &Probe{} batchMetrics := probe.NewLegacyBatchMetrics(probeName, qdiscMetrics, p.CollectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } type Probe struct{} func (p *Probe) Start(_ context.Context) error { return nil } func (p *Probe) Stop(_ context.Context) error { return nil } func (p *Probe) CollectOnce() (map[string]map[uint32]uint64, error) { resMap := make(map[string]map[uint32]uint64) for _, metric := range qdiscMetrics { resMap[metric.Name] = make(map[uint32]uint64) } ets := nettop.GetAllUniqueNetnsEntity() for _, et := range ets { stats, err := getQdiscStats(et) if err != nil { log.Errorf("%s failed get qdisc stats: %v", probeName, err) continue } for _, stat := range stats { // only care about eth0/eth1... if strings.HasPrefix(stat.IfaceName, "eth") { resMap[Bytes][uint32(et.GetNetns())] += stat.Bytes resMap[Packets][uint32(et.GetNetns())] += uint64(stat.Packets) resMap[Drops][uint32(et.GetNetns())] += uint64(stat.Drops) resMap[Qlen][uint32(et.GetNetns())] += uint64(stat.Qlen) resMap[Backlog][uint32(et.GetNetns())] += uint64(stat.Backlog) resMap[Overlimits][uint32(et.GetNetns())] += uint64(stat.Overlimits) } } } return resMap, nil } func getQdiscStats(entity *nettop.Entity) ([]QdiscInfo, error) { nsHandle, err := entity.OpenNsHandle() if err != nil { return nil, err } defer nsHandle.Close() c, err := getConn(int(nsHandle)) if err != nil { return nil, err } defer c.Close() req := netlink.Message{ Header: netlink.Header{ Flags: netlink.Request | netlink.Dump, Type: 38, // RTM_GETQDISC }, Data: make([]byte, 20), } msgs, err := c.Execute(req) if err != nil { return nil, fmt.Errorf("failed to execute request: %v", err) } res := []QdiscInfo{} for _, msg := range msgs { m, err := parseMessage(msg) if err != nil { log.Errorf("failed parse qdisc msg, nlmsg: %v, err: %v", msg, err) continue } res = append(res, m) } return res, nil } func getConn(nsfd int) (*netlink.Conn, error) { c, err := netlink.Dial(familyRoute, &netlink.Config{ NetNS: nsfd, }) if err != nil { return nil, err } if err := c.SetOption(netlink.GetStrictCheck, true); err != nil { // silently accept ENOPROTOOPT errors when kernel is not > 4.20 if !errors.Is(err, syscall.ENOPROTOOPT) { return nil, fmt.Errorf("unexpected error trying to set option NETLINK_GET_STRICT_CHK: %v", err) } } return c, nil } type QdiscInfo struct { IfaceName string Parent uint32 Handle uint32 Kind string Bytes uint64 Packets uint32 Drops uint32 Requeues uint32 Overlimits uint32 GcFlows uint64 Throttled uint64 FlowsPlimit uint64 Qlen uint32 Backlog uint32 } // See struct tc_stats in /usr/include/linux/pkt_sched.h type TCStats struct { Bytes uint64 Packets uint32 Drops uint32 Overlimits uint32 Bps uint32 Pps uint32 Qlen uint32 Backlog uint32 } // See /usr/include/linux/gen_stats.h type TCStats2 struct { // struct gnet_stats_basic Bytes uint64 Packets uint32 // struct gnet_stats_queue Qlen uint32 Backlog uint32 Drops uint32 Requeues uint32 Overlimits uint32 } // See struct tc_fq_qd_stats /usr/include/linux/pkt_sched.h type TCFqQdStats struct { GcFlows uint64 HighprioPackets uint64 TCPRetrans uint64 Throttled uint64 FlowsPlimit uint64 PktsTooLong uint64 AllocationErrors uint64 TimeNextDelayedFlow int64 Flows uint32 InactiveFlows uint32 ThrottledFlows uint32 UnthrottleLatencyNs uint32 } func parseTCAStats(attr netlink.Attribute) TCStats { var stats TCStats stats.Bytes = nlenc.Uint64(attr.Data[0:8]) stats.Packets = nlenc.Uint32(attr.Data[8:12]) stats.Drops = nlenc.Uint32(attr.Data[12:16]) stats.Overlimits = nlenc.Uint32(attr.Data[16:20]) stats.Bps = nlenc.Uint32(attr.Data[20:24]) stats.Pps = nlenc.Uint32(attr.Data[24:28]) stats.Qlen = nlenc.Uint32(attr.Data[28:32]) stats.Backlog = nlenc.Uint32(attr.Data[32:36]) return stats } func parseTCAStats2(attr netlink.Attribute) TCStats2 { var stats TCStats2 nested, _ := netlink.UnmarshalAttributes(attr.Data) for _, a := range nested { switch a.Type { case TCAStatsBasic: stats.Bytes = nlenc.Uint64(a.Data[0:8]) stats.Packets = nlenc.Uint32(a.Data[8:12]) case TCAStatsQueue: stats.Qlen = nlenc.Uint32(a.Data[0:4]) stats.Backlog = nlenc.Uint32(a.Data[4:8]) stats.Drops = nlenc.Uint32(a.Data[8:12]) stats.Requeues = nlenc.Uint32(a.Data[12:16]) stats.Overlimits = nlenc.Uint32(a.Data[16:20]) default: } } return stats } func parseTCFqQdStats(attr netlink.Attribute) (TCFqQdStats, error) { var stats TCFqQdStats nested, err := netlink.UnmarshalAttributes(attr.Data) if err != nil { return stats, err } pts := []*uint64{ &stats.GcFlows, &stats.HighprioPackets, &stats.TCPRetrans, &stats.Throttled, &stats.FlowsPlimit, &stats.PktsTooLong, &stats.AllocationErrors, } for _, a := range nested { switch a.Type { case TCAStatsApp: for i := 0; i < len(pts) && (i+1)*8 <= len(a.Data); i++ { *pts[i] = nlenc.Uint64(a.Data[i*8 : (i+1)*8]) } default: } } return stats, nil } // See https://tools.ietf.org/html/rfc3549#section-3.1.3 func parseMessage(msg netlink.Message) (QdiscInfo, error) { var m QdiscInfo var s TCStats var s2 TCStats2 var sFq TCFqQdStats /* struct tcmsg { unsigned char tcm_family; unsigned char tcm__pad1; unsigned short tcm__pad2; int tcm_ifindex; __u32 tcm_handle; __u32 tcm_parent; __u32 tcm_info; }; */ if len(msg.Data) < 20 { return m, fmt.Errorf("short message, len=%d", len(msg.Data)) } ifaceIdx := nlenc.Uint32(msg.Data[4:8]) m.Handle = nlenc.Uint32(msg.Data[8:12]) m.Parent = nlenc.Uint32(msg.Data[12:16]) if m.Parent == math.MaxUint32 { m.Parent = 0 } // The first 20 bytes are taken by tcmsg attrs, err := netlink.UnmarshalAttributes(msg.Data[20:]) if err != nil { return m, fmt.Errorf("failed to unmarshal attributes: %v", err) } for _, attr := range attrs { switch attr.Type { case TCAKind: m.Kind = nlenc.String(attr.Data) case TCAStats2: sFq, err = parseTCFqQdStats(attr) if err != nil { return m, err } if sFq.GcFlows > 0 { m.GcFlows = sFq.GcFlows } if sFq.Throttled > 0 { m.Throttled = sFq.Throttled } if sFq.FlowsPlimit > 0 { m.FlowsPlimit = sFq.FlowsPlimit } s2 = parseTCAStats2(attr) m.Bytes = s2.Bytes m.Packets = s2.Packets m.Drops = s2.Drops // requeues only available in TCAStats2, not in TCAStats m.Requeues = s2.Requeues m.Overlimits = s2.Overlimits m.Qlen = s2.Qlen m.Backlog = s2.Backlog case TCAStats: // Legacy s = parseTCAStats(attr) m.Bytes = s.Bytes m.Packets = s.Packets m.Drops = s.Drops m.Overlimits = s.Overlimits m.Qlen = s.Qlen m.Backlog = s.Backlog default: // TODO: TCAOptions and TCAXStats } } iface, err := net.InterfaceByIndex(int(ifaceIdx)) if err == nil { m.IfaceName = iface.Name } return m, err }