pkg/exporter/probe/tracesocketlatency/socketlatency.go (267 lines of code) (raw):

package tracesocketlatency import ( "bytes" "context" "encoding/binary" "errors" "fmt" "math/bits" "sync" "time" "unsafe" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/util" log "github.com/sirupsen/logrus" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_sklat_metric_t -type insp_sklat_event_t bpf ../../../../bpf/socketlatency.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH} // nolint const ( ModuleName = "socketlatency" // nolint SOCKETLAT_READSLOW = "SOCKETLAT_READSLOW" SOCKETLAT_SENDSLOW = "SOCKETLAT_SENDSLOW" READ100MS = "read100ms" READ300MS = "read300ms" READ1MS = "read1ms" WRITE100MS = "write100ms" WRITE1MS = "write1ms" /* #define ACTION_READ 1 #define ACTION_WRITE 2 #define ACTION_HANDLE 4 #define BUCKET100MS 1 #define BUCKET10MS 2 #define BUCKET1MS 4 */ ACTION_READ = 1 ACTION_WRITE = 2 ACTION_HANDLE = 4 BUCKET100MS = 1 BUCKET1MS = 4 ) var ( probeName = "socketlatency" _socketLatency = &socketLatencyProbe{} socketlatencyMetrics = []probe.LegacyMetric{ {Name: READ100MS, Help: "The total count of read operations that took longer than 100 milliseconds."}, {Name: READ1MS, Help: "The total count of read operations that took longer than 1 millisecond."}, {Name: WRITE100MS, Help: "The total count of write operations that took longer than 100 milliseconds."}, {Name: WRITE1MS, Help: "The total count of write operations that took longer than 1 millisecond."}, } ) func init() { probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator) probe.MustRegisterEventProbe(probeName, eventProbeCreator) } func metricsProbeCreator() (probe.MetricsProbe, error) { p := &metricsProbe{} batchMetrics := probe.NewLegacyBatchMetrics(probeName, socketlatencyMetrics, p.CollectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (probe.EventProbe, error) { p := &eventProbe{ sink: sink, } return probe.NewEventProbe(probeName, p), nil } type metricsProbe struct { } func (p *metricsProbe) Start(_ context.Context) error { return _socketLatency.start(probe.ProbeTypeMetrics) } func (p *metricsProbe) Stop(_ context.Context) error { return _socketLatency.stop(probe.ProbeTypeMetrics) } func (p *metricsProbe) CollectOnce() (map[string]map[uint32]uint64, error) { return _socketLatency.collect() } type eventProbe struct { sink chan<- *probe.Event } func (e *eventProbe) Start(_ context.Context) error { err := _socketLatency.start(probe.ProbeTypeEvent) if err != nil { return err } _socketLatency.sink = e.sink return nil } func (e *eventProbe) Stop(_ context.Context) error { return _socketLatency.stop(probe.ProbeTypeEvent) } type socketLatencyProbe struct { objs bpfObjects links []link.Link sink chan<- *probe.Event refcnt [probe.ProbeTypeCount]int lock sync.Mutex perfReader *perf.Reader } func (p *socketLatencyProbe) stop(probeType probe.Type) error { p.lock.Lock() defer p.lock.Unlock() if p.refcnt[probeType] == 0 { return fmt.Errorf("probe %s never start", probeType) } p.refcnt[probeType]-- if p.totalReferenceCountLocked() == 0 { return p.cleanup() } return nil } func (p *socketLatencyProbe) cleanup() error { if p.perfReader != nil { p.perfReader.Close() } for _, link := range p.links { link.Close() } p.links = nil p.objs.Close() return nil } func (p *socketLatencyProbe) totalReferenceCountLocked() int { var c int for _, n := range p.refcnt { c += n } return c } func (p *socketLatencyProbe) start(probeType probe.Type) (err error) { p.lock.Lock() defer p.lock.Unlock() p.refcnt[probeType]++ if p.totalReferenceCountLocked() == 1 { if err = p.loadAndAttachBPF(); err != nil { log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) _ = p.cleanup() return } p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspSklatEvents, int(unsafe.Sizeof(bpfInspSklatEventT{}))) if err != nil { log.Warnf("%s failed create new perf reader, err: %v", probeName, err) return } go p.perfLoop() } return nil } func (p *socketLatencyProbe) perfLoop() { for { record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { log.Infof("%s received signal, exiting..", probeName) return } log.Infof("%s failed reading from reader, err: %v", probeName, err) continue } if record.LostSamples != 0 { log.Infof("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples) continue } var event bpfInspSklatEventT if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Infof("%s failed parsing event, err: %v", probeName, err) continue } // filter netlink/unixsock/tproxy packet if event.Tuple.Dport == 0 && event.Tuple.Sport == 0 { continue } evt := &probe.Event{ Timestamp: time.Now().UnixNano(), Labels: probe.LegacyEventLabels(event.SkbMeta.Netns), } /* #define ACTION_READ 1 #define ACTION_WRITE 2 */ if event.Direction == ACTION_READ { evt.Type = SOCKETLAT_READSLOW } else if event.Direction == ACTION_WRITE { evt.Type = SOCKETLAT_SENDSLOW } tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport)) evt.Message = fmt.Sprintf("%s latency=%s", tuple, bpfutil.GetHumanTimes(event.Latency)) if p.sink != nil { log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt)) p.sink <- evt } } } func (p *socketLatencyProbe) collect() (map[string]map[uint32]uint64, error) { res := map[string]map[uint32]uint64{} for _, mtr := range socketlatencyMetrics { res[mtr.Name] = map[uint32]uint64{} } // 从map中获取数据 m, err := bpfutil.MustLoadPin(ModuleName) if err != nil { return nil, err } var ( value uint64 entries = m.Iterate() key = bpfInspSklatMetricT{} ) // 解析数据后更新指标,按照指标/netns/数据的格式存放map[string]map[uint32]uint64 for entries.Next(&key, &value) { if key.Netns == 0 { continue } if key.Action == ACTION_READ { if key.Bucket == BUCKET100MS { // In kernel space, metric READ1MS and metric READ100MS are counted separately, // so the value of BUCKET100MS needs to be added to metric READ1MS in user space at the same time. res[READ100MS][key.Netns] += value res[READ1MS][key.Netns] += value } else if key.Bucket == BUCKET1MS { res[READ1MS][key.Netns] += value } } if key.Action == ACTION_WRITE { if key.Bucket == BUCKET100MS { res[WRITE100MS][key.Netns] += value res[WRITE1MS][key.Netns] += value } else if key.Bucket == BUCKET1MS { res[WRITE1MS][key.Netns] += value } } } return res, nil } func (p *socketLatencyProbe) loadAndAttachBPF() error { // Allow the current process to lock memory for eBPF resources. if err := rlimit.RemoveMemlock(); err != nil { return fmt.Errorf("remove limit failed: %s", err.Error()) } opts := ebpf.CollectionOptions{} opts.Programs = ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), } // Load pre-compiled programs and maps into the kernel. if err := loadBpfObjects(&p.objs, &opts); err != nil { return fmt.Errorf("loading objects: %s", err.Error()) } linkcreate, err := link.Kprobe("inet_ehash_nolisten", p.objs.SockCreate, nil) if err != nil { return fmt.Errorf("link inet_ehash_nolisten: %s", err.Error()) } p.links = append(p.links, linkcreate) linkreceive, err := link.Kprobe("sock_def_readable", p.objs.SockReceive, nil) if err != nil { return fmt.Errorf("link sock_def_readable: %s", err.Error()) } p.links = append(p.links, linkreceive) linkread, err := link.Kprobe("tcp_cleanup_rbuf", p.objs.SockRead, nil) if err != nil { return fmt.Errorf("link tcp_cleanup_rbuf: %s", err.Error()) } p.links = append(p.links, linkread) linkwrite, err := link.Kprobe("tcp_sendmsg_locked", p.objs.SockWrite, nil) if err != nil { return fmt.Errorf("link tcp_sendmsg_locked: %s", err.Error()) } p.links = append(p.links, linkwrite) linksend, err := link.Kprobe("tcp_write_xmit", p.objs.SockSend, nil) if err != nil { return fmt.Errorf("link tcp_write_xmit: %s", err.Error()) } p.links = append(p.links, linksend) linkdestroy, err := link.Kprobe("tcp_done", p.objs.SockDestroy, nil) if err != nil { return fmt.Errorf("link tcp_done: %s", err.Error()) } p.links = append(p.links, linkdestroy) err = bpfutil.MustPin(p.objs.InspSklatMetric, probeName) if err != nil { return fmt.Errorf("pin map %s failed: %s", ModuleName, err.Error()) } return nil }