pkg/exporter/probe/tracekernel/tracekernel.go (304 lines of code) (raw):

package tracekernel import ( "bytes" "context" "encoding/binary" "errors" "fmt" "math/bits" "strings" "sync" "time" "unsafe" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/util" log "github.com/sirupsen/logrus" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_kl_event_t bpf ../../../../bpf/kernellatency.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH} // nolint const ( TXKERNEL_SLOW = "TXKERNEL_SLOW" RXKERNEL_SLOW = "RXKERNEL_SLOW" HOOK_IPRCV = "ip_rcv" HOOK_IPRCVFIN = "ip_rcv_finish" HOOK_IPLOCAL = "ip_local_deliver" HOOK_IPLOCALFIN = "ip_local_deliver_finish" HOOK_IPXMIT = "__ip_queue_xmit" HOOK_IPLOCALOUT = "ip_local_out" HOOK_IPOUTPUT = "ip_output" HOOK_IPOUTPUTFIN = "ip_finish_output2" RXKERNEL_SLOW_METRIC = "rxslow" TXKERNEL_SLOW_METRIC = "txslow" RXKERNEL_SLOW100MS_METRIC = "rxslow100ms" TXKERNEL_SLOW100MS_METRIC = "txslow100ms" probeTypeEvent = 0 probeTypeMetrics = 1 ) var ( metrics = []probe.LegacyMetric{ {Name: RXKERNEL_SLOW_METRIC, Help: "The total count of incoming packets that experienced slow processing in the RX kernel path."}, {Name: RXKERNEL_SLOW100MS_METRIC, Help: "The total count of incoming packets that took longer than 100 milliseconds to process in the RX kernel path."}, {Name: TXKERNEL_SLOW_METRIC, Help: "The total count of outgoing packets that experienced slow processing in the TX kernel path."}, {Name: TXKERNEL_SLOW100MS_METRIC, Help: "The total count of outgoing packets that took longer than 100 milliseconds to process in the TX kernel path."}, } probeName = "kernellatency" latencyProbe = &kernelLatencyProbe{ metricsMap: make(map[string]map[uint32]uint64), } ) func init() { probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator) probe.MustRegisterEventProbe(probeName, eventProbeCreator) } func metricsProbeCreator() (probe.MetricsProbe, error) { p := &metricsProbe{} batchMetrics := probe.NewLegacyBatchMetrics(probeName, metrics, p.CollectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (probe.EventProbe, error) { p := &eventProbe{ sink: sink, } return probe.NewEventProbe(probeName, p), nil } type metricsProbe struct { } func (p *metricsProbe) Start(ctx context.Context) error { return latencyProbe.start(ctx, probe.ProbeTypeMetrics) } func (p *metricsProbe) Stop(ctx context.Context) error { return latencyProbe.stop(ctx, probe.ProbeTypeMetrics) } func (p *metricsProbe) CollectOnce() (map[string]map[uint32]uint64, error) { return latencyProbe.copyMetricsMap(), nil } type eventProbe struct { sink chan<- *probe.Event } func (e *eventProbe) Start(ctx context.Context) error { err := latencyProbe.start(ctx, probe.ProbeTypeEvent) if err != nil { return err } latencyProbe.sink = e.sink return nil } func (e *eventProbe) Stop(ctx context.Context) error { return latencyProbe.stop(ctx, probe.ProbeTypeEvent) } type kernelLatencyProbe struct { objs bpfObjects links []link.Link sink chan<- *probe.Event refcnt [2]int lock sync.Mutex perfReader *perf.Reader metricsMap map[string]map[uint32]uint64 metricsLock sync.RWMutex } func (p *kernelLatencyProbe) stopLocked(probeType probe.Type) error { if p.refcnt[probeType] == 0 { return fmt.Errorf("probe %s never start", probeType) } p.refcnt[probeType]-- if p.refcnt[probe.ProbeTypeEvent] == 0 { if p.perfReader != nil { p.perfReader.Close() } } if p.totalReferenceCountLocked() == 0 { return p.cleanup() } return nil } func (p *kernelLatencyProbe) stop(_ context.Context, probeType probe.Type) error { p.lock.Lock() defer p.lock.Unlock() return p.stopLocked(probeType) } func (p *kernelLatencyProbe) cleanup() error { if p.perfReader != nil { p.perfReader.Close() } for _, link := range p.links { link.Close() } p.links = nil p.objs.Close() return nil } func (p *kernelLatencyProbe) copyMetricsMap() map[string]map[uint32]uint64 { p.metricsLock.RLock() defer p.metricsLock.RUnlock() return probe.CopyLegacyMetricsMap(p.metricsMap) } func (p *kernelLatencyProbe) totalReferenceCountLocked() int { var c int for _, n := range p.refcnt { c += n } return c } func (p *kernelLatencyProbe) start(_ context.Context, probeType probe.Type) (err error) { p.lock.Lock() defer p.lock.Unlock() if p.refcnt[probeType] != 0 { return fmt.Errorf("%s(%s) has already started", probeName, probeType) } p.refcnt[probeType]++ if p.totalReferenceCountLocked() == 1 { if err = p.loadAndAttachBPF(); err != nil { p.refcnt[probeType]-- log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) _ = p.cleanup() return fmt.Errorf("%s failed load bpf: %w", probeName, err) } } if p.refcnt[probe.ProbeTypeEvent] == 1 { p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspKlatencyEvent, int(unsafe.Sizeof(bpfInspKlEventT{}))) if err != nil { log.Errorf("%s failed create perf reader, err: %v", probeName, err) _ = p.stopLocked(probeType) return fmt.Errorf("%s failed create bpf reader: %w", probeName, err) } go p.perfLoop() } return nil } func (p *kernelLatencyProbe) updateMetrics(netns uint32, metrics string) { p.metricsLock.Lock() defer p.metricsLock.Unlock() if _, ok := p.metricsMap[metrics]; !ok { p.metricsMap[metrics] = make(map[uint32]uint64) } p.metricsMap[metrics][netns]++ } func (p *kernelLatencyProbe) perfLoop() { for { record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { log.Errorf("%s received signal, exiting..", probeName) return } log.Warnf("%s failed reading from reader, err: %v", probeName, err) continue } if record.LostSamples != 0 { log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples) continue } var event bpfInspKlEventT if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Errorf("%s failed parsing event, err: %v", probeName, err) continue } netns := event.SkbMeta.Netns evt := &probe.Event{ Timestamp: time.Now().UnixNano(), Labels: probe.LegacyEventLabels(netns), } /* #define RX_KLATENCY 1 #define TX_KLATENCY 2 */ tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport)) switch event.Direction { case 1: evt.Type = RXKERNEL_SLOW latency := []string{fmt.Sprintf("latency:%s", bpfutil.GetHumanTimes(event.Latency))} if event.Point2 > event.Point1 { latency = append(latency, fmt.Sprintf("PREROUTING:%s", bpfutil.GetHumanTimes(event.Point2-event.Point1))) } if event.Point3 > event.Point2 && event.Point2 != 0 { latency = append(latency, fmt.Sprintf("ROUTE:%s", bpfutil.GetHumanTimes(event.Point3-event.Point2))) } if event.Point4 > event.Point3 && event.Point3 != 0 { latency = append(latency, fmt.Sprintf("LOCAL_IN:%s", bpfutil.GetHumanTimes(event.Point4-event.Point3))) } evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " ")) p.updateMetrics(netns, RXKERNEL_SLOW_METRIC) case 2: evt.Type = TXKERNEL_SLOW latency := []string{fmt.Sprintf("latency=%s", bpfutil.GetHumanTimes(event.Latency))} if event.Point3 > event.Point1 && event.Point1 != 0 { latency = append(latency, fmt.Sprintf("LOCAL_OUT=%s", bpfutil.GetHumanTimes(event.Point3-event.Point1))) } if event.Point4 > event.Point3 && event.Point3 != 0 { latency = append(latency, fmt.Sprintf("POSTROUTING=%s", bpfutil.GetHumanTimes(event.Point4-event.Point3))) } evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " ")) p.updateMetrics(netns, TXKERNEL_SLOW_METRIC) default: log.Infof("%s failed parsing event %s, ignore", probeName, util.ToJSONString(evt)) continue } if p.sink != nil { log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt)) p.sink <- evt } } } func (p *kernelLatencyProbe) loadAndAttachBPF() error { // 准备动作 if err := rlimit.RemoveMemlock(); err != nil { return err } opts := ebpf.CollectionOptions{} // 获取btf信息 opts.Programs = ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), } // 获取Loaded的程序/map的fd信息 if err := loadBpfObjects(&p.objs, &opts); err != nil { return fmt.Errorf("loading objects: %v", err) } progrcv, err := link.Kprobe(HOOK_IPRCV, p.objs.KlatencyIpRcv, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPRCV: %s", err.Error()) } p.links = append(p.links, progrcv) progrcvfin, err := link.Kprobe(HOOK_IPRCVFIN, p.objs.KlatencyIpRcvFinish, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPRCVFIN: %s", err.Error()) } p.links = append(p.links, progrcvfin) proglocal, err := link.Kprobe(HOOK_IPLOCAL, p.objs.KlatencyIpLocalDeliver, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPRCV: %s", err.Error()) } p.links = append(p.links, proglocal) proglocalfin, err := link.Kprobe(HOOK_IPLOCALFIN, p.objs.KlatencyIpLocalDeliverFinish, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPLOCALFIN: %s", err.Error()) } p.links = append(p.links, proglocalfin) progxmit, err := link.Kprobe(HOOK_IPXMIT, p.objs.KlatencyIpQueueXmit, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPXMIT: %s", err.Error()) } p.links = append(p.links, progxmit) proglocalout, err := link.Kprobe(HOOK_IPLOCALOUT, p.objs.KlatencyIpLocal, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPLOCALOUT: %s", err.Error()) } p.links = append(p.links, proglocalout) progoutput, err := link.Kprobe(HOOK_IPOUTPUT, p.objs.KlatencyIpOutput, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPOUTPUT: %s", err.Error()) } p.links = append(p.links, progoutput) progfin, err := link.Kprobe(HOOK_IPOUTPUTFIN, p.objs.KlatencyIpFinishOutput2, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link HOOK_IPOUTPUTFIN: %s", err.Error()) } p.links = append(p.links, progfin) progkfree, err := link.Kprobe("kfree_skb", p.objs.ReportKfree, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link kfree_skb: %s", err.Error()) } p.links = append(p.links, progkfree) progconsume, err := link.Kprobe("consume_skb", p.objs.ReportConsume, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link consume_skb: %s", err.Error()) } p.links = append(p.links, progconsume) return nil }