in pkg/exporter/probe/tracetcpreset/tracetcpreset.go [77:142]
func (p *tcpResetProbe) perfLoop() {
for {
record, err := p.perfReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Infof("%s received signal, exiting..", probeName)
return
}
log.Errorf("%s failed reading from reader, err: %v", probeName, err)
continue
}
if record.LostSamples != 0 {
log.Infof("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
continue
}
var event bpfInspTcpresetEventT
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Infof("%s failed parsing event, err: %v", probeName, err)
continue
}
/*
#define RESET_NOSOCK 1
#define RESET_ACTIVE 2
#define RESET_PROCESS 4
#define RESET_RECEIVE 8
*/
var eventType probe.EventType
switch event.Type {
case 1:
eventType = TCPRESET_NOSOCK
case 2:
eventType = TCPRESET_ACTIVE
case 4:
eventType = TCPRESET_PROCESS
case 8:
eventType = TCPRESET_RECEIVE
default:
log.Infof("%s got invalid perf event type %d, data: %s", probeName, event.Type, util.ToJSONString(event))
continue
}
if event.Tuple.L3Proto == syscall.ETH_P_IPV6 {
log.Infof("%s ignore event of ipv6 proto", probeName)
continue
}
evt := &probe.Event{
Timestamp: time.Now().UnixNano(),
Type: eventType,
Labels: probe.LegacyEventLabels(event.SkbMeta.Netns),
}
tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport))
stateStr := bpfutil.GetSkcStateStr(event.State)
evt.Message = fmt.Sprintf("%s state:%s ", tuple, stateStr)
if p.sink != nil {
log.Debugf("%s sink event: %s", probeName, util.ToJSONString(evt))
p.sink <- evt
}
}
}