pkg/exporter/probe/tracetcpretrans/tcpretrans.go (250 lines of code) (raw):
package tracetcpretrans
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"syscall"
"time"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
log "github.com/sirupsen/logrus"
"strings"
"sync"
"unsafe"
"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
)
//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS -type insp_tcpretrans_event_t -type tuple -type addr bpf ../../../../bpf/tcpretrans.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH}
// nolint
const (
retransTotal = "total"
retransFast = "fast"
TCPRetrans = "TCPRetrans"
)
var (
probeName = "tcpretrans"
ignoreSymbolList = map[string]struct{}{}
_tcpRetransProbe = &tcpRetransProbe{}
)
func init() {
var err error
_tcpRetransProbe.cache, err = lru.New[probe.Tuple, *Counter](102400)
if err != nil {
panic(fmt.Sprintf("cannot create lru cache for packetloss probe:%v", err))
}
probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator)
probe.MustRegisterEventProbe(probeName, eventProbeCreator)
}
func metricsProbeCreator() (probe.MetricsProbe, error) {
p := &metricsProbe{}
opts := probe.BatchMetricsOpts{
Namespace: probe.MetricsNamespace,
Subsystem: probeName,
VariableLabels: probe.TupleMetricsLabels,
SingleMetricsOpts: []probe.SingleMetricsOpts{
{Name: retransTotal, ValueType: prometheus.CounterValue},
//{Name: retransFast, ValueType: prometheus.CounterValue},
},
}
batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce)
return probe.NewMetricsProbe(probeName, p, batchMetrics), nil
}
func eventProbeCreator(sink chan<- *probe.Event, _ map[string]interface{}) (probe.EventProbe, error) {
p := &eventProbe{
sink: sink,
}
return probe.NewEventProbe(probeName, p), nil
}
type probeConfig struct {
}
type metricsProbe struct {
}
func (p *metricsProbe) Start(_ context.Context) error {
return _tcpRetransProbe.start(probe.ProbeTypeMetrics, &probeConfig{})
}
func (p *metricsProbe) Stop(_ context.Context) error {
return _tcpRetransProbe.stop(probe.ProbeTypeMetrics)
}
func (p *metricsProbe) collectOnce(emit probe.Emit) error {
keys := _tcpRetransProbe.cache.Keys()
for _, tuple := range keys {
counter, ok := _tcpRetransProbe.cache.Get(tuple)
if !ok || counter == nil {
continue
}
labels := probe.BuildTupleMetricsLabels(&tuple)
emit(retransTotal, labels, float64(counter.Total))
//emit(retransFast, labels, float64(counter.Fast))
}
return nil
}
type eventProbe struct {
sink chan<- *probe.Event
}
func (e *eventProbe) Start(_ context.Context) error {
err := _tcpRetransProbe.start(probe.ProbeTypeEvent, &probeConfig{})
if err != nil {
return err
}
_tcpRetransProbe.sink = e.sink
return nil
}
func (e *eventProbe) Stop(_ context.Context) error {
return _tcpRetransProbe.stop(probe.ProbeTypeEvent)
}
type Counter struct {
Total uint32
Fast uint32
}
type tcpRetransProbe struct {
objs bpfObjects
links []link.Link
sink chan<- *probe.Event
probeConfig [probe.ProbeTypeCount]*probeConfig
lock sync.Mutex
perfReader *perf.Reader
cache *lru.Cache[probe.Tuple, *Counter]
}
func (p *tcpRetransProbe) probeCount() int {
var ret int
for _, cfg := range p.probeConfig {
if cfg != nil {
ret++
}
}
return ret
}
func (p *tcpRetransProbe) stop(probeType probe.Type) error {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeConfig[probeType] == nil {
return fmt.Errorf("probe %s never start", probeType)
}
p.probeConfig[probeType] = nil
if p.probeCount() == 0 {
p.cleanup()
}
return nil
}
func (p *tcpRetransProbe) cleanup() {
if p.perfReader != nil {
p.perfReader.Close()
p.perfReader = nil
}
for _, link := range p.links {
link.Close()
}
p.links = nil
p.objs.Close()
}
func (p *tcpRetransProbe) start(probeType probe.Type, cfg *probeConfig) (err error) {
p.lock.Lock()
defer p.lock.Unlock()
if p.probeConfig[probeType] != nil {
return fmt.Errorf("%s(%s) has already started", probeName, probeType)
}
p.probeConfig[probeType] = cfg
if err := p.reinstallBPFLocked(); err != nil {
return fmt.Errorf("%s failed install ebpf: %w", probeName, err)
}
return nil
}
func (p *tcpRetransProbe) reinstallBPFLocked() (err error) {
p.cleanup()
defer func() {
if err != nil {
p.cleanup()
}
}()
if err = p.loadAndAttachBPF(); err != nil {
return fmt.Errorf("%s failed load and attach bpf, err: %w", probeName, err)
}
p.perfReader, err = perf.NewReader(p.objs.bpfMaps.InspTcpRetransEvent, int(unsafe.Sizeof(bpfInspTcpretransEventT{})))
if err != nil {
return fmt.Errorf("%s error create perf reader, err: %w", probeName, err)
}
go p.perfLoop()
return nil
}
func (p *tcpRetransProbe) loadAndAttachBPF() error {
// Allow the current process to lock memory for eBPF resources.
if err := rlimit.RemoveMemlock(); err != nil {
return fmt.Errorf("remove limit failed: %s", err.Error())
}
opts := ebpf.CollectionOptions{}
opts.Programs = ebpf.ProgramOptions{
KernelTypes: bpfutil.LoadBTFSpecOrNil(),
}
// Load pre-compiled programs and maps into the kernel.
if err := loadBpfObjects(&p.objs, &opts); err != nil {
return fmt.Errorf("loading objects: %s", err.Error())
}
pl, err := link.Tracepoint("tcp", "tcp_retransmit_skb", p.objs.bpfPrograms.Tcpretrans, &link.TracepointOptions{})
if err != nil {
return fmt.Errorf("link raw tracepoint tcp_retransmit_skb failed: %s", err.Error())
}
p.links = append(p.links, pl)
return nil
}
func toProbeTuple(t *bpfTuple) *probe.Tuple {
t.L3Proto = syscall.ETH_P_IPV6
return &probe.Tuple{
Protocol: t.L4Proto,
Src: bpfutil.GetAddrStr(t.L3Proto, t.Saddr.V6addr),
Dst: bpfutil.GetAddrStr(t.L3Proto, t.Daddr.V6addr),
Sport: t.Sport,
Dport: t.Dport,
}
}
func (p *tcpRetransProbe) perfLoop() {
for {
anotherLoop:
record, err := p.perfReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
log.Infof("%s received signal, exiting..", probeName)
return
}
log.Errorf("%s failed reading from reader, err: %v", probeName, err)
continue
}
if record.LostSamples != 0 {
log.Warnf("%s perf event ring buffer full, drop: %d", probeName, record.LostSamples)
continue
}
var event bpfInspTcpretransEventT
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.NativeEndian, &event); err != nil {
log.Errorf("%s failed parsing event, err: %v", probeName, err)
continue
}
tuple := toProbeTuple(&event.Tuple)
v, ok := p.cache.Get(*tuple)
if !ok {
v = &Counter{}
p.cache.Add(*tuple, v)
}
v.Total++
evt := &probe.Event{
Timestamp: time.Now().UnixNano(),
Type: TCPRetrans,
Labels: probe.BuildTupleEventLabels(tuple),
}
//TODO add trigger to enable/disable stack
stacks, err := bpfutil.GetSymsByStack(uint32(event.StackId), p.objs.InspTcpRetransStack)
if err != nil {
log.Warnf("%s failed get sym by stack, err: %v", probeName, err)
continue
}
var strs []string
for _, sym := range stacks {
if _, ok := ignoreSymbolList[sym.GetName()]; ok {
goto anotherLoop
}
strs = append(strs, sym.GetExpr())
}
evt.Message = strings.Join(strs, "\n")
if p.sink != nil {
p.sink <- evt
}
}
}