pkg/exporter/probe/tracevirtcmdlat/tracevirtcmdlat.go (209 lines of code) (raw):
package tracevirtcmdlat
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
"unsafe"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/alibaba/kubeskoop/pkg/exporter/util"
log "github.com/sirupsen/logrus"
"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_virtcmdlat_event_t bpf ../../../../bpf/virtcmdlatency.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH}
const (
VIRTCMD100MS = "latency100ms"
VIRTCMD = "latency"
VIRTCMDEXCUTE = "VIRTCMDEXCUTE"
VIRTCMDEXCUTE100MS = "VIRTCMDEXCUTE_100MS"
fn = "virtnet_send_command"
probeName = "virtcmdlatency"
)
var (
metrics = []probe.LegacyMetric{
{Name: VIRTCMD100MS, Help: ""},
{Name: VIRTCMD, Help: ""},
}
_virtcmdLatencyProbe = &virtcmdLatencyProbe{
metricsMap: map[string]map[uint32]uint64{
VIRTCMD: {0: 0},
VIRTCMD100MS: {0: 0},
},
}
)
func init() {
probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator)
probe.MustRegisterEventProbe(probeName, eventProbeCreator)
}
func metricsProbeCreator(_ map[string]interface{}) (probe.MetricsProbe, error) {
p := &metricsProbe{}
batchMetrics := probe.NewLegacyBatchMetrics(probeName, metrics, p.CollectOnce)
return probe.NewMetricsProbe(probeName, p, batchMetrics), nil
}
func eventProbeCreator(sink chan<- *probe.Event) (probe.EventProbe, error) {
p := &eventProbe{
sink: sink,
}
return probe.NewEventProbe(probeName, p), nil
}
type metricsProbe struct {
}
func (p *metricsProbe) Start(ctx context.Context) error {
return _virtcmdLatencyProbe.start(ctx, probe.ProbeTypeMetrics)
}
func (p *metricsProbe) Stop(ctx context.Context) error {
return _virtcmdLatencyProbe.stop(ctx, probe.ProbeTypeMetrics)
}
func (p *metricsProbe) CollectOnce() (map[string]map[uint32]uint64, error) {
return _virtcmdLatencyProbe.copyMetricsMap(), nil
}
type eventProbe struct {
sink chan<- *probe.Event
}
func (e *eventProbe) Start(ctx context.Context) error {
err := _virtcmdLatencyProbe.start(ctx, probe.ProbeTypeEvent)
if err != nil {
return err
}
_virtcmdLatencyProbe.sink = e.sink
return nil
}
func (e *eventProbe) Stop(ctx context.Context) error {
return _virtcmdLatencyProbe.stop(ctx, probe.ProbeTypeEvent)
}
type virtcmdLatencyProbe struct {
objs bpfObjects
links []link.Link
sink chan<- *probe.Event
refcnt [probe.ProbeTypeCount]int
lock sync.Mutex
perfReader *perf.Reader
metricsMap map[string]map[uint32]uint64
metricsLock sync.RWMutex
}
func (p *virtcmdLatencyProbe) stop(_ context.Context, probeType probe.Type) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.refcnt[probeType] == 0 {
return fmt.Errorf("probe %s never start", probeType)
}
p.refcnt[probeType]--
if p.totalReferenceCountLocked() == 0 {
return p.cleanup()
}
return nil
}
func (p *virtcmdLatencyProbe) cleanup() error {
if p.perfReader != nil {
p.perfReader.Close()
}
for _, link := range p.links {
link.Close()
}
p.links = nil
p.objs.Close()
return nil
}
func (p *virtcmdLatencyProbe) updateMetrics(metrics string) {
p.metricsLock.Lock()
defer p.metricsLock.Unlock()
if _, ok := p.metricsMap[metrics]; !ok {
p.metricsMap[metrics] = make(map[uint32]uint64)
}
p.metricsMap[metrics][0]++
}
func (p *virtcmdLatencyProbe) copyMetricsMap() map[string]map[uint32]uint64 {
p.metricsLock.RLock()
defer p.metricsLock.RUnlock()
return probe.CopyLegacyMetricsMap(p.metricsMap)
}
func (p *virtcmdLatencyProbe) totalReferenceCountLocked() int {
var c int
for _, n := range p.refcnt {
c += n
}
return c
}
func (p *virtcmdLatencyProbe) start(_ context.Context, probeType probe.Type) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
p.refcnt[probeType]++
if p.totalReferenceCountLocked() == 1 {
if err = p.loadAndAttachBPF(); err != nil {
log.Errorf("%s failed load and attach bpf, err: %v", probeName, err)
_ = p.cleanup()
return
}
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspVirtcmdlatEvents, int(unsafe.Sizeof(bpfInspVirtcmdlatEventT{})))
if err != nil {
log.Warnf("%s failed create new perf reader, err: %v", probeName, err)
return
}
go p.perfLoop()
}
return nil
}
func (p *virtcmdLatencyProbe) perfLoop() {
for {
record, err := p.perfReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Infof("%s received signal, exiting..", probeName)
return
}
log.Infof("%s failed reading from reader, err: %v", probeName, err)
continue
}
if record.LostSamples != 0 {
log.Infof("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
continue
}
var event bpfInspVirtcmdlatEventT
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Infof("%s failed parsing event, err: %v", probeName, err)
continue
}
evt := &probe.Event{
Type: VIRTCMDEXCUTE,
}
p.updateMetrics(VIRTCMD)
if event.Latency > 100_000_000 {
evt.Type = VIRTCMDEXCUTE100MS
p.updateMetrics(VIRTCMD100MS)
}
evt.Message = fmt.Sprintf("cpu=%d pid=%d latency=%s", event.Cpu, event.Pid, bpfutil.GetHumanTimes(event.Latency))
if p.sink != nil {
log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt))
p.sink <- evt
}
}
}
func (p *virtcmdLatencyProbe) loadAndAttachBPF() error {
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}
opts := ebpf.CollectionOptions{}
opts.Programs = ebpf.ProgramOptions{
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}
// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.objs, &opts); err != nil {
return fmt.Errorf("loading objects: %s", err.Error())
}
linkentry, err := link.Kprobe(fn, p.objs.TraceVirtcmd, &link.KprobeOptions{})
if err != nil {
return fmt.Errorf("link %s: %s", fn, err.Error())
}
p.links = append(p.links, linkentry)
linkexit, err := link.Kretprobe(fn, p.objs.TraceVirtcmdret, &link.KprobeOptions{})
if err != nil {
return fmt.Errorf("link ret %s: %s", fn, err.Error())
}
p.links = append(p.links, linkexit)
return nil
}