pkg/exporter/probe/tracepacketloss/packetloss.go (336 lines of code) (raw):

package tracepacketloss import ( "bytes" "context" "encoding/binary" "errors" "fmt" "time" "github.com/alibaba/kubeskoop/pkg/exporter/nettop" lru "github.com/hashicorp/golang-lru/v2" "github.com/prometheus/client_golang/prometheus" "github.com/alibaba/kubeskoop/pkg/exporter/probe" log "github.com/sirupsen/logrus" "strings" "sync" "unsafe" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_pl_event_t -type addr -type tuple bpf ../../../../bpf/packetloss.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH} // nolint const ( packetLossTotal = "total" packetLossNetfilter = "netfilter" //PACKETLOSS_ABNORMAL = "abnormal" //PACKETLOSS_TCPSTATEM = "tcpstatm" //PACKETLOSS_TCPRCV = "tcprcv" //PACKETLOSS_TCPHANDLE = "tcphandle" PacketLoss = "PacketLoss" featureSwitchEnablePacketLossStackKey = 0 ) var ( ignoreSymbolList = map[string]struct{}{} uselessSymbols = map[string]bool{ "sk_stream_kill_queues": true, "unix_release_sock": true, "nfnetlink_rcv_batch": true, "skb_queue_purge": true, } netfilterSymbol = "nf_hook_slow" //tcpstatmSymbol = "tcp_rcv_state_process" //tcprcvSymbol = "tcp_v4_rcv" //tcpdorcvSymbol = "tcp_v4_do_rcv" probeName = "packetloss" _packetLossProbe = &packetLossProbe{} ) func init() { var err error _packetLossProbe.cache, err = lru.New[cacheKey, *Counter](10240) if err != nil { panic(fmt.Sprintf("cannot create lru cache for packetloss probe:%v", err)) } probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator) probe.MustRegisterEventProbe(probeName, eventProbeCreator) } type packetlossArgs struct { EnableStack bool `mapstructure:"EnableStack"` } func metricsProbeCreator() (probe.MetricsProbe, error) { p := &metricsProbe{} labels := probe.TupleMetricsLabels labels = append(labels, "k8s_node") opts := probe.BatchMetricsOpts{ Namespace: probe.MetricsNamespace, Subsystem: probeName, VariableLabels: labels, SingleMetricsOpts: []probe.SingleMetricsOpts{ {Name: packetLossTotal, ValueType: prometheus.CounterValue}, {Name: packetLossNetfilter, ValueType: prometheus.CounterValue}, }, } batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } func eventProbeCreator(sink chan<- *probe.Event, args packetlossArgs) (probe.EventProbe, error) { p := &eventProbe{ args: args, sink: sink, } return probe.NewEventProbe(probeName, p), nil } type metricsProbe struct { } func (p *metricsProbe) Start(_ context.Context) error { cfg := probeConfig{} return _packetLossProbe.start(probe.ProbeTypeMetrics, &cfg) } func (p *metricsProbe) Stop(_ context.Context) error { return _packetLossProbe.stop(probe.ProbeTypeMetrics) } func (p *metricsProbe) collectOnce(emit probe.Emit) error { keys := _packetLossProbe.cache.Keys() for _, key := range keys { counter, ok := _packetLossProbe.cache.Get(key) if !ok || counter == nil { continue } if time.Now().UnixNano()-counter.lastUpdate > 30*time.Second.Nanoseconds() && counter.snatched { _packetLossProbe.cache.Remove(key) counter.Total = 0 counter.Netfilter = 0 } tuple := &probe.Tuple{ Protocol: key.protocol, Src: key.src, Dst: key.dst, } labels := probe.BuildTupleMetricsLabels(tuple) labels = append(labels, nettop.GetNodeName()) emit(packetLossTotal, labels, float64(counter.Total)) emit(packetLossNetfilter, labels, float64(counter.Netfilter)) counter.snatched = true } return nil } type eventProbe struct { args packetlossArgs sink chan<- *probe.Event } func (e *eventProbe) Start(_ context.Context) error { cfg := probeConfig{ enableStack: e.args.EnableStack, } err := _packetLossProbe.start(probe.ProbeTypeEvent, &cfg) if err != nil { return err } _packetLossProbe.sink = e.sink return nil } func (e *eventProbe) Stop(_ context.Context) error { return _packetLossProbe.stop(probe.ProbeTypeEvent) } type Counter struct { lastUpdate int64 snatched bool Total uint32 Netfilter uint32 } type probeConfig struct { enableStack bool } type cacheKey struct { protocol uint8 src string dst string } type packetLossProbe struct { objs bpfObjects links []link.Link sink chan<- *probe.Event probeConfig [probe.ProbeTypeCount]*probeConfig lock sync.Mutex perfReader *perf.Reader cache *lru.Cache[cacheKey, *Counter] } func (p *packetLossProbe) probeCount() int { var ret int for _, cfg := range p.probeConfig { if cfg != nil { ret++ } } return ret } func (p *packetLossProbe) stop(probeType probe.Type) error { p.lock.Lock() defer p.lock.Unlock() if p.probeConfig[probeType] == nil { return fmt.Errorf("probe %s never start", probeType) } p.probeConfig[probeType] = nil if p.probeCount() == 0 { p.cleanup() } return nil } func (p *packetLossProbe) cleanup() { if p.perfReader != nil { p.perfReader.Close() p.perfReader = nil } for _, link := range p.links { link.Close() } p.links = nil p.objs.Close() } func (p *packetLossProbe) start(probeType probe.Type, cfg *probeConfig) error { p.lock.Lock() defer p.lock.Unlock() if p.probeConfig[probeType] != nil { return fmt.Errorf("%s(%s) has already started", probeName, probeType) } p.probeConfig[probeType] = cfg if err := p.reinstallBPFLocked(); err != nil { return fmt.Errorf("%s failed install ebpf: %w", probeName, err) } return nil } func (p *packetLossProbe) reinstallBPFLocked() (err error) { p.cleanup() defer func() { if err != nil { p.cleanup() } }() if err = p.loadAndAttachBPF(); err != nil { return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err) } p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspPlEvent, int(unsafe.Sizeof(bpfInspPlEventT{}))) if err != nil { return fmt.Errorf("%s error create perf reader, err: %w", probeName, err) } go p.perfLoop() return nil } func (p *packetLossProbe) enableStack() bool { cfg := p.probeConfig[probe.ProbeTypeEvent] return cfg != nil && cfg.enableStack } func (p *packetLossProbe) loadAndAttachBPF() error { // Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { return fmt.Errorf("remove limit failed: %s", err.Error()) } opts := ebpf.CollectionOptions{ Programs: ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), }, } if err := loadBpfObjects(&p.objs, &opts); err != nil { return fmt.Errorf("loading objects: %s", err.Error()) } if p.enableStack() { if err := bpfutil.UpdateFeatureSwitch(p.objs.InspPacketlossFeatureSwitch, featureSwitchEnablePacketLossStackKey, 1); err != nil { return fmt.Errorf("failed update packetloss feature switch: %w", err) } } pl, err := link.Tracepoint("skb", "kfree_skb", p.objs.KfreeSkb, &link.TracepointOptions{}) if err != nil { return fmt.Errorf("link tracepoint kfree_skb failed: %w", err) } p.links = append(p.links, pl) return nil } func ignoreLocation(loc uint64) bool { sym, err := bpfutil.GetSymPtFromBpfLocation(loc) if err != nil { log.Infof("cannot find location %d", loc) // emit the event anyway return false } _, ok := uselessSymbols[sym.GetName()] return ok } func toProbeTuple(t *bpfTuple) *probe.Tuple { return &probe.Tuple{ Protocol: t.L4Proto, Src: bpfutil.GetAddrStr(t.L3Proto, t.Saddr.V6addr), Dst: bpfutil.GetAddrStr(t.L3Proto, t.Daddr.V6addr), Sport: t.Sport, Dport: t.Dport, } } func (p *packetLossProbe) countByLocation(loc uint64, counter *Counter) { sym, err := bpfutil.GetSymPtFromBpfLocation(loc) if err != nil { log.Warnf("%s get sym failed, location: %x, err: %v", probeName, loc, err) return } switch sym.GetName() { case netfilterSymbol: counter.Netfilter++ } } func (p *packetLossProbe) add2Cache(loc uint64, tuple *probe.Tuple) { key := cacheKey{ protocol: tuple.Protocol, src: tuple.Src, dst: tuple.Dst, } v, ok := p.cache.Get(key) if !ok { v = &Counter{} p.cache.Add(key, v) } v.Total++ p.countByLocation(loc, v) v.lastUpdate = time.Now().UnixNano() v.snatched = false } func (p *packetLossProbe) perfLoop() { for { anotherLoop: record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { log.Infof("%s received signal, exiting..", probeName) return } log.Errorf("%s failed reading from reader, err: %v", probeName, err) continue } if record.LostSamples != 0 { log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples) continue } var event bpfInspPlEventT if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.NativeEndian, &event); err != nil { log.Errorf("%s failed parsing event, err: %v", probeName, err) continue } if ignoreLocation(event.Location) { continue } tuple := toProbeTuple(&event.Tuple) p.add2Cache(event.Location, tuple) evt := &probe.Event{ Timestamp: time.Now().UnixNano(), Type: PacketLoss, Labels: probe.BuildTupleEventLabels(tuple), } if p.enableStack() { stacks, err := bpfutil.GetSymsByStack(uint32(event.StackId), p.objs.InspPlStack) if err != nil { log.Warnf("%s failed get sym by stack, err: %v", probeName, err) continue } var strs []string for _, sym := range stacks { if _, ok := ignoreSymbolList[sym.GetName()]; ok { goto anotherLoop } strs = append(strs, sym.GetExpr()) } evt.Message = strings.Join(strs, "\n") } if p.sink != nil { p.sink <- evt } } }