pkg/exporter/probe/tracesoftirq/tracesoftirq.go (309 lines of code) (raw):

package tracesoftirq import ( "bytes" "context" "encoding/binary" "errors" "fmt" "sync" "time" "unsafe" "github.com/alibaba/kubeskoop/pkg/exporter/nettop" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/util" "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" log "github.com/sirupsen/logrus" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_softirq_event_t bpf ../../../../bpf/softirq.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH} const ( SOFTIRQ_SCHED_SLOW = "schedslow" //nolint SOFTIRQ_SCHED_100MS = "schedslow100ms" //nolint SOFTIRQ_EXCUTE_SLOW = "excuteslow" //nolint SOFTIRQ_EXCUTE_100MS = "excuteslow100ms" //nolint ) var ( probeName = "softirq" softirqTypes = []string{"hi", "timer", "net_tx", "net_rx", "block", "irq_poll", "tasklet", "sched", "hrtimer", "rcu"} _softirqProbe = &softirqProbe{ metricsMap: map[string]map[string]uint64{ SOFTIRQ_SCHED_SLOW: {}, SOFTIRQ_SCHED_100MS: {}, SOFTIRQ_EXCUTE_SLOW: {}, SOFTIRQ_EXCUTE_100MS: {}, }, } ) func init() { probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator) probe.MustRegisterEventProbe(probeName, eventProbeCreator) } type softirqArgs struct { SoftirqTypes []string `mapstructure:"softirq-types"` } func metricsProbeCreator(args softirqArgs) (probe.MetricsProbe, error) { if len(args.SoftirqTypes) == 0 { args.SoftirqTypes = []string{"net_rx"} } _softirqProbe.metricsProbeIrqTypes = softirqTypesBits(args.SoftirqTypes) p := &metricsProbe{} opts := probe.BatchMetricsOpts{ Namespace: probe.MetricsNamespace, Subsystem: probeName, VariableLabels: []string{"k8s_node", "softirq_type"}, SingleMetricsOpts: []probe.SingleMetricsOpts{ {Name: SOFTIRQ_SCHED_SLOW, ValueType: prometheus.CounterValue}, {Name: SOFTIRQ_SCHED_100MS, ValueType: prometheus.CounterValue}, {Name: SOFTIRQ_EXCUTE_SLOW, ValueType: prometheus.CounterValue}, {Name: SOFTIRQ_EXCUTE_100MS, ValueType: prometheus.CounterValue}, }, } batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } func eventProbeCreator(sink chan<- *probe.Event, args softirqArgs) (probe.EventProbe, error) { if len(args.SoftirqTypes) == 0 { args.SoftirqTypes = []string{"net_rx"} } _softirqProbe.eventProbeIrqTypes = softirqTypesBits(args.SoftirqTypes) p := &eventProbe{ sink: sink, } return probe.NewEventProbe(probeName, p), nil } type metricsProbe struct { } func (p *metricsProbe) Start(_ context.Context) error { return _softirqProbe.start(probe.ProbeTypeMetrics) } func (p *metricsProbe) Stop(_ context.Context) error { return _softirqProbe.stop(probe.ProbeTypeMetrics) } func (p *metricsProbe) collectOnce(emit probe.Emit) error { _softirqProbe.metricsLock.RLock() defer _softirqProbe.metricsLock.RUnlock() nodeName := nettop.GetNodeName() for metricsName, values := range _softirqProbe.metricsMap { for _, irqType := range enabledIrqTypes(_softirqProbe.metricsProbeIrqTypes) { emit(metricsName, []string{nodeName, irqType}, float64(values[irqType])) } } return nil } type eventProbe struct { sink chan<- *probe.Event } func (e *eventProbe) Start(_ context.Context) error { err := _softirqProbe.start(probe.ProbeTypeEvent) if err != nil { return err } _softirqProbe.sink = e.sink return nil } func (e *eventProbe) Stop(_ context.Context) error { return _softirqProbe.stop(probe.ProbeTypeEvent) } type softirqProbe struct { objs bpfObjects links []link.Link sink chan<- *probe.Event refcnt [probe.ProbeTypeCount]int metricsProbeIrqTypes uint32 eventProbeIrqTypes uint32 ebpfProbeIrqType uint32 lock sync.Mutex perfReader *perf.Reader metricsMap map[string]map[string]uint64 metricsLock sync.RWMutex } func (p *softirqProbe) stop(probeType probe.Type) error { p.lock.Lock() defer p.lock.Unlock() if p.refcnt[probeType] == 0 { return fmt.Errorf("probe %s never start", probeType) } p.refcnt[probeType]-- if p.totalReferenceCountLocked() == 0 { return p.cleanup() } return nil } func (p *softirqProbe) cleanup() error { if p.perfReader != nil { p.perfReader.Close() } for _, link := range p.links { link.Close() } p.links = nil p.objs.Close() return nil } func (p *softirqProbe) totalReferenceCountLocked() int { var c int for _, n := range p.refcnt { c += n } return c } func (p *softirqProbe) start(probeType probe.Type) (err error) { p.lock.Lock() defer p.lock.Unlock() p.refcnt[probeType]++ var needLoad bool if p.totalReferenceCountLocked() == 1 { needLoad = true } else if p.ebpfProbeIrqType != p.metricsProbeIrqTypes|p.eventProbeIrqTypes { _ = p.cleanup() needLoad = true } if needLoad { p.ebpfProbeIrqType = p.metricsProbeIrqTypes | p.eventProbeIrqTypes if err = p.loadAndAttachBPF(); err != nil { log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) _ = p.cleanup() return fmt.Errorf("%s failed load bpf: %w", probeName, err) } // 初始化map的读接口 p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspSoftirqEvents, int(unsafe.Sizeof(bpfInspSoftirqEventT{}))) if err != nil { log.Errorf("%s failed create perf reader, err: %v", probeName, err) return err } go p.perfLoop() } return nil } func (p *softirqProbe) updateMetrics(metrics string, irqType uint32) { p.metricsLock.Lock() defer p.metricsLock.Unlock() if !filterIrqEvent(p.metricsProbeIrqTypes, irqType) { return } if _, ok := p.metricsMap[metrics]; !ok { p.metricsMap[metrics] = make(map[string]uint64) } p.metricsMap[metrics][convertIrqType(irqType)]++ } func (p *softirqProbe) perfLoop() { for { record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { log.Errorf("%s received signal, exiting..", probeName) return } log.Warnf("%s failed reading from reader, err: %v", probeName, err) continue } if record.LostSamples != 0 { log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples) continue } var event bpfInspSoftirqEventT if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Errorf("%s failed parsing event, err: %v", probeName, err) continue } evt := &probe.Event{ Timestamp: time.Now().UnixNano(), Labels: []probe.Label{ { Name: "type", Value: convertIrqType(event.VecNr), }, }, } /* #define PHASE_SCHED 1 #define PHASE_EXCUTE 2 */ switch event.Phase { case 1: evt.Type = "SOFTIRQ_SCHED_SLOW" p.updateMetrics(SOFTIRQ_SCHED_SLOW, event.VecNr) if event.Latency > 100_000_000 { evt.Type = "SOFTIRQ_SCHED_100MS" p.updateMetrics(SOFTIRQ_SCHED_100MS, event.VecNr) } case 2: evt.Type = "SOFTIRQ_EXCUTE_SLOW" p.updateMetrics(SOFTIRQ_EXCUTE_SLOW, event.VecNr) if event.Latency > 100_000_000 { evt.Type = "SOFTIRQ_EXCUTE_100MS" p.updateMetrics(SOFTIRQ_EXCUTE_100MS, event.VecNr) } default: log.Infof("%s failed parsing event, phase: %d", probeName, event.Phase) continue } evt.Message = fmt.Sprintf("cpu=%d pid=%d latency=%s ", event.Cpu, event.Pid, bpfutil.GetHumanTimes(event.Latency)) if filterIrqEvent(p.eventProbeIrqTypes, event.VecNr) && p.sink != nil { log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt)) p.sink <- evt } } } func (p *softirqProbe) loadAndAttachBPF() error { // 准备动作 if err := rlimit.RemoveMemlock(); err != nil { return err } opts := ebpf.CollectionOptions{} // 获取btf信息 opts.Programs = ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), } // 获取Loaded的程序/map的fd信息 spec, err := loadBpf() if err != nil { return err } err = spec.RewriteConstants(map[string]interface{}{"irq_filter_bits": p.ebpfProbeIrqType}) if err != nil { return err } err = spec.LoadAndAssign(&p.objs, &opts) if err != nil { return err } prograise, err := link.Tracepoint("irq", "softirq_raise", p.objs.TraceSoftirqRaise, &link.TracepointOptions{}) if err != nil { return fmt.Errorf("link softirq_raise: %s", err.Error()) } p.links = append(p.links, prograise) progentry, err := link.Tracepoint("irq", "softirq_entry", p.objs.TraceSoftirqEntry, &link.TracepointOptions{}) if err != nil { return fmt.Errorf("link softirq_entry: %s", err.Error()) } p.links = append(p.links, progentry) progexit, err := link.Tracepoint("irq", "softirq_exit", p.objs.TraceSoftirqExit, &link.TracepointOptions{}) if err != nil { return fmt.Errorf("link softirq_exit: %w", err) } p.links = append(p.links, progexit) return nil } func convertIrqType(vec uint32) string { return softirqTypes[vec] } func filterIrqEvent(filterBits, vec uint32) bool { return filterBits&(1<<vec) != 0 } func enabledIrqTypes(filterBits uint32) []string { var types []string for i, bit := range softirqTypes { if filterBits&(1<<i) != 0 { types = append(types, bit) } } return types } func softirqTypesBits(types []string) uint32 { var bits uint32 for _, softirqType := range types { index := lo.IndexOf(softirqTypes, softirqType) if index >= 0 { bits |= 1 << index } } return bits }