func()

in pkg/exporter/probe/tracekernel/tracekernel.go [226:295]


func (p *kernelLatencyProbe) perfLoop() {
	for {
		record, err := p.perfReader.Read()
		if err != nil {
			if errors.Is(err, ringbuf.ErrClosed) {
				log.Errorf("%s received signal, exiting..", probeName)
				return
			}
			log.Warnf("%s failed reading from reader, err: %v", probeName, err)
			continue
		}

		if record.LostSamples != 0 {
			log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
			continue
		}

		var event bpfInspKlEventT
		if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
			log.Errorf("%s failed parsing event, err: %v", probeName, err)
			continue
		}

		netns := event.SkbMeta.Netns
		evt := &probe.Event{
			Timestamp: time.Now().UnixNano(),
			Labels:    probe.LegacyEventLabels(netns),
		}
		/*
		   #define RX_KLATENCY 1
		   #define TX_KLATENCY 2
		*/
		tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport))
		switch event.Direction {
		case 1:
			evt.Type = RXKERNEL_SLOW
			latency := []string{fmt.Sprintf("latency:%s", bpfutil.GetHumanTimes(event.Latency))}
			if event.Point2 > event.Point1 {
				latency = append(latency, fmt.Sprintf("PREROUTING:%s", bpfutil.GetHumanTimes(event.Point2-event.Point1)))
			}
			if event.Point3 > event.Point2 && event.Point2 != 0 {
				latency = append(latency, fmt.Sprintf("ROUTE:%s", bpfutil.GetHumanTimes(event.Point3-event.Point2)))
			}
			if event.Point4 > event.Point3 && event.Point3 != 0 {
				latency = append(latency, fmt.Sprintf("LOCAL_IN:%s", bpfutil.GetHumanTimes(event.Point4-event.Point3)))
			}
			evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " "))
			p.updateMetrics(netns, RXKERNEL_SLOW_METRIC)
		case 2:
			evt.Type = TXKERNEL_SLOW
			latency := []string{fmt.Sprintf("latency=%s", bpfutil.GetHumanTimes(event.Latency))}
			if event.Point3 > event.Point1 && event.Point1 != 0 {
				latency = append(latency, fmt.Sprintf("LOCAL_OUT=%s", bpfutil.GetHumanTimes(event.Point3-event.Point1)))
			}
			if event.Point4 > event.Point3 && event.Point3 != 0 {
				latency = append(latency, fmt.Sprintf("POSTROUTING=%s", bpfutil.GetHumanTimes(event.Point4-event.Point3)))
			}
			evt.Message = fmt.Sprintf("%s %s", tuple, strings.Join(latency, " "))
			p.updateMetrics(netns, TXKERNEL_SLOW_METRIC)
		default:
			log.Infof("%s failed parsing event %s, ignore", probeName, util.ToJSONString(evt))
			continue
		}

		if p.sink != nil {
			log.Debugf("%s sink event %s", probeName, util.ToJSONString(evt))
			p.sink <- evt
		}
	}
}