in pkg/exporter/probe/tracekernel/tracekernel.go [226:295]
func (p *kernelLatencyProbe) perfLoop() {
for {
record, err := p.perfReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Errorf("%s received signal, exiting..", probeName)
return
}
log.Warnf("%s failed reading from reader, err: %v", probeName, err)
continue
}
if record.LostSamples != 0 {
log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
continue
}
var event bpfInspKlEventT
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Errorf("%s failed parsing event, err: %v", probeName, err)
continue
}
netns := event.SkbMeta.Netns
evt := &probe.Event{
Timestamp: time.Now().UnixNano(),
Labels: probe.LegacyEventLabels(netns),
}
/*
#define RX_KLATENCY 1
#define TX_KLATENCY 2
*/
tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport))
switch event.Direction {
case 1:
evt.Type = RXKERNEL_SLOW
latency := []string{fmt.Sprintf("latency:%s", bpfutil.GetHumanTimes(event.Latency))}
if event.Point2 > event.Point1 {
latency = append(latency, fmt.Sprintf("PREROUTING:%s", bpfutil.GetHumanTimes(event.Point2-event.Point1)))
}
if event.Point3 > event.Point2 && event.Point2 != 0 {
latency = append(latency, fmt.Sprintf("ROUTE:%s", bpfutil.GetHumanTimes(event.Point3-event.Point2)))
}
if event.Point4 > event.Point3 && event.Point3 != 0 {
latency = append(latency, fmt.Sprintf("LOCAL_IN:%s", bpfutil.GetHumanTimes(event.Point4-event.Point3)))
}
evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " "))
p.updateMetrics(netns, RXKERNEL_SLOW_METRIC)
case 2:
evt.Type = TXKERNEL_SLOW
latency := []string{fmt.Sprintf("latency=%s", bpfutil.GetHumanTimes(event.Latency))}
if event.Point3 > event.Point1 && event.Point1 != 0 {
latency = append(latency, fmt.Sprintf("LOCAL_OUT=%s", bpfutil.GetHumanTimes(event.Point3-event.Point1)))
}
if event.Point4 > event.Point3 && event.Point3 != 0 {
latency = append(latency, fmt.Sprintf("POSTROUTING=%s", bpfutil.GetHumanTimes(event.Point4-event.Point3)))
}
evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " "))
p.updateMetrics(netns, TXKERNEL_SLOW_METRIC)
default:
log.Infof("%s failed parsing event %s, ignore", probeName, util.ToJSONString(evt))
continue
}
if p.sink != nil {
log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt))
p.sink <- evt
}
}
}