pkg/exporter/probe/tracetcpreset/tracetcpreset.go (155 lines of code) (raw):

package tracetcpreset import ( "bytes" "context" "encoding/binary" "errors" "fmt" "math/bits" "syscall" "time" "unsafe" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/util" log "github.com/sirupsen/logrus" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_tcpreset_event_t bpf ../../../../bpf/tcpreset.c -- -I../../../../bpf/headers -I ../../../../bpf/headers/${GOARCH} -D__TARGET_ARCH_${GOARCH} // nolint const ( TCPRESET_NOSOCK = "TCPRESET_NOSOCK" TCPRESET_ACTIVE = "TCPRESET_ACTIVE" TCPRESET_PROCESS = "TCPRESET_PROCESS" TCPRESET_RECEIVE = "TCPRESET_RECEIVE" ) var ( probeName = "tcpreset" ) func init() { probe.MustRegisterEventProbe(probeName, eventProbeCreator) } func eventProbeCreator(sink chan<- *probe.Event) (probe.EventProbe, error) { p := &tcpResetProbe{ sink: sink, } return probe.NewEventProbe(probeName, p), nil } type tcpResetProbe struct { sink chan<- *probe.Event objs bpfObjects links []link.Link perfReader *perf.Reader } func (p *tcpResetProbe) Start(_ context.Context) (err error) { err = p.loadAndAttachBPF() if err != nil { log.Errorf("%s failed load and attach bpf, err: %v", probeName, err) _ = p.cleanup() return } p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpresetEvents, int(unsafe.Sizeof(bpfInspTcpresetEventT{}))) if err != nil { log.Errorf("%s failed create new perf reader, err: %v", probeName, err) _ = p.cleanup() return } go p.perfLoop() return } func (p *tcpResetProbe) perfLoop() { for { record, err := p.perfReader.Read() if err != nil { if errors.Is(err, ringbuf.ErrClosed) { log.Infof("%s received signal, exiting..", probeName) return } log.Errorf("%s failed reading from reader, err: %v", probeName, err) continue } if record.LostSamples != 0 { log.Infof("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples) continue } var event bpfInspTcpresetEventT if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Infof("%s failed parsing event, err: %v", probeName, err) continue } /* #define RESET_NOSOCK 1 #define RESET_ACTIVE 2 #define RESET_PROCESS 4 #define RESET_RECEIVE 8 */ var eventType probe.EventType switch event.Type { case 1: eventType = TCPRESET_NOSOCK case 2: eventType = TCPRESET_ACTIVE case 4: eventType = TCPRESET_PROCESS case 8: eventType = TCPRESET_RECEIVE default: log.Infof("%s got invalid perf event type %d, data: %s", probeName, event.Type, util.ToJSONString(event)) continue } if event.Tuple.L3Proto == syscall.ETH_P_IPV6 { log.Infof("%s ignore event of ipv6 proto", probeName) continue } evt := &probe.Event{ Timestamp: time.Now().UnixNano(), Type: eventType, Labels: probe.LegacyEventLabels(event.SkbMeta.Netns), } tuple := fmt.Sprintf("protocol=%s saddr=%s sport=%d daddr=%s dport=%d ", bpfutil.GetProtoStr(event.Tuple.L4Proto), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Saddr))), bits.ReverseBytes16(event.Tuple.Sport), bpfutil.GetAddrStr(event.Tuple.L3Proto, *(*[16]byte)(unsafe.Pointer(&event.Tuple.Daddr))), bits.ReverseBytes16(event.Tuple.Dport)) stateStr := bpfutil.GetSkcStateStr(event.State) evt.Message = fmt.Sprintf("%s state:%s ", tuple, stateStr) if p.sink != nil { log.Debugf("%s sink event: %s", probeName, util.ToJSONString(evt)) p.sink <- evt } } } func (p *tcpResetProbe) Stop(_ context.Context) error { return p.cleanup() } func (p *tcpResetProbe) cleanup() error { if p.perfReader != nil { p.perfReader.Close() } for _, link := range p.links { link.Close() } p.links = nil p.objs.Close() return nil } func (p *tcpResetProbe) loadAndAttachBPF() error { // 准备动作 if err := rlimit.RemoveMemlock(); err != nil { return err } opts := ebpf.CollectionOptions{} // 获取btf信息 opts.Programs = ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), } // 获取Loaded的程序/map的fd信息 if err := loadBpfObjects(&p.objs, &opts); err != nil { return fmt.Errorf("loading objects: %v", err) } progsend, err := link.Kprobe("tcp_v4_send_reset", p.objs.TraceSendreset, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link tcp_v4_send_reset: %s", err.Error()) } p.links = append(p.links, progsend) progactive, err := link.Kprobe("tcp_send_active_reset", p.objs.TraceSendactive, &link.KprobeOptions{}) if err != nil { return fmt.Errorf("link tcp_send_active_reset: %s", err.Error()) } p.links = append(p.links, progactive) kprecv, err := link.Tracepoint("tcp", "tcp_receive_reset", p.objs.InspRstrx, nil) if err != nil { return err } p.links = append(p.links, kprecv) return nil }