pkg/exporter/probe/tracebiolatency/tracebiolatency.go (122 lines of code) (raw):
package tracebiolatency
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"time"
"unsafe"
"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
log "github.com/sirupsen/logrus"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_biolat_event_t bpf ../../../../bpf/tracebiolatency.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH}
var (
probeName = "biolatency"
)
func init() {
probe.MustRegisterEventProbe(probeName, bioLatencyProbeCreator)
}
func bioLatencyProbeCreator(sink chan<- *probe.Event) (probe.EventProbe, error) {
p := &BiolatencyProbe{
sink: sink,
}
return probe.NewEventProbe(probeName, p), nil
}
type BiolatencyProbe struct {
sink chan<- *probe.Event
objs bpfObjects
links []link.Link
reader *perf.Reader
}
func (p *BiolatencyProbe) Start(_ context.Context) error {
log.Debugf("start probe %s", probeName)
if err := p.loadAndAttachBPF(); err != nil {
_ = p.cleanup()
return err
}
var err error
p.reader, err = perf.NewReader(p.objs.InspBiolatEvts, int(unsafe.Sizeof(bpfInspBiolatEntryT{})))
if err != nil {
_ = p.cleanup()
return err
}
go p.perf()
// 开始针对perf事件进行读取
return nil
}
func (p *BiolatencyProbe) perf() {
for {
record, err := p.reader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Infof("%s received signal, exiting..", probeName)
return
}
log.Infof("%s failed reading from reader, err: %v", probeName, err)
continue
}
if record.LostSamples != 0 {
log.Infof("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
continue
}
// 解析perf事件信息,输出为proto.Event
var event bpfInspBiolatEventT
// Parse the ringbuf event entry into a bpfEvent structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Infof("%s failed parsing event, err: %v", probeName, err)
continue
}
pid := event.Pid
if et, err := nettop.GetEntityByPid(int(pid)); err != nil || et == nil {
log.Warnf("%s got unspecified event, pid: %d, task %s", probeName, pid, bpfutil.GetCommString(event.Target))
continue
}
evt := &probe.Event{
Timestamp: time.Now().UnixNano(),
Type: "BIOLAT_10MS",
Message: fmt.Sprintf("%s %d latency %s", bpfutil.GetCommString(event.Target), event.Pid, bpfutil.GetHumanTimes(event.Latency)),
}
p.sink <- evt
}
}
func (p *BiolatencyProbe) Stop(_ context.Context) error {
return p.cleanup()
}
func (p *BiolatencyProbe) cleanup() error {
if p.reader != nil {
p.reader.Close()
}
for _, link := range p.links {
link.Close()
}
p.objs.Close()
return nil
}
func (p *BiolatencyProbe) loadAndAttachBPF() error {
// 准备动作
if err := rlimit.RemoveMemlock(); err != nil {
log.Fatal(err)
}
p.links = nil
opts := ebpf.CollectionOptions{}
opts.Programs = ebpf.ProgramOptions{
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}
// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.objs, &opts); err != nil {
return fmt.Errorf("loading objects: %s", err.Error())
}
linkcreate, err := link.Kprobe("blk_account_io_start", p.objs.BiolatStart, nil)
if err != nil {
return fmt.Errorf("link blk_account_io_start: %s", err.Error())
}
p.links = append(p.links, linkcreate)
linkdone, err := link.Kprobe("blk_account_io_done", p.objs.BiolatFinish, nil)
if err != nil {
return fmt.Errorf("link blk_account_io_done: %s", err.Error())
}
p.links = append(p.links, linkdone)
return nil
}