pkg/exporter/probe/nlconntrack/conntrackevents.go (140 lines of code) (raw):
package nlconntrack
import (
"context"
"fmt"
"net"
"strconv"
"time"
log "github.com/sirupsen/logrus"
"github.com/vishvananda/netns"
"github.com/alibaba/kubeskoop/pkg/exporter/probe"
"github.com/alibaba/kubeskoop/pkg/exporter/bpfutil"
"github.com/alibaba/kubeskoop/pkg/exporter/nettop"
"github.com/mdlayher/netlink"
"github.com/ti-mo/conntrack"
"github.com/ti-mo/netfilter"
)
const (
ConntrackNew = "ConntrackNew"
ConntrackUpdate = "ConntrackUpdate"
ConntrackDestroy = "ConntrackDestroy"
ConntrackExpNew = "ConntrackExpNew"
ConntrackExpDestroy = "ConntrackExpDestroy"
ConntrackUnknow = "ConntrackUnknow"
)
var (
probeName = "conntrack"
)
func eventProbeCreator(sink chan<- *probe.Event) (probe.EventProbe, error) {
p := &conntrackEventProbe{
sink: sink,
}
return probe.NewEventProbe(probeName, p), nil
}
type conntrackEventProbe struct {
sink chan<- *probe.Event
conns map[int]chan struct{}
done chan struct{}
}
func (p *conntrackEventProbe) Start(ctx context.Context) error {
go func() {
ticker := time.NewTicker(10 * time.Second)
select {
case <-ticker.C:
log.Infof("%s: start update netns list", probeName)
ets := nettop.GetAllUniqueNetnsEntity()
for _, et := range ets {
if et == nil {
log.Infof("%s: skip empty entity", probeName)
continue
}
nsHandle, err := et.OpenNsHandle()
if err != nil {
log.Infof("%s: failed get netns fd, skip netns fd, err: %v", probeName, err)
continue
}
if nsHandle == 0 {
log.Infof("%s: invalid nsfd(0), skip empty netns fd", probeName)
continue
}
if _, ok := p.conns[et.GetNetns()]; !ok {
ctrch := make(chan struct{})
go func() {
err = p.startCtListen(ctx, ctrch, nsHandle, et.GetNetns())
if err != nil {
log.Infof("%s: failed start worker, err: %v", probeName, err)
return
}
}()
p.conns[et.GetNetns()] = ctrch
log.Infof("%s: start worker finished", probeName)
}
}
case <-p.done:
return
}
}()
return nil
}
func (p *conntrackEventProbe) Stop(_ context.Context) error {
close(p.done)
for _, conn := range p.conns {
close(conn)
}
return nil
}
func (p *conntrackEventProbe) startCtListen(_ context.Context, ctrch <-chan struct{}, nsHandle netns.NsHandle, nsinum int) error {
c, err := conntrack.Dial(&netlink.Config{
NetNS: int(nsHandle),
})
defer nsHandle.Close()
if err != nil {
log.Infof("%s: failed start conntrack dial, err: %v", probeName, err)
return err
}
log.Infof("%s: start conntrack listen in netns %d", probeName, nsHandle)
evCh := make(chan conntrack.Event, 1024)
errCh, err := c.Listen(evCh, 4, append(netfilter.GroupsCT, netfilter.GroupsCTExp...))
if err != nil {
log.Infof("%s: failed start conntrack listen, err: %v", probeName, err)
return err
}
for {
select {
case <-ctrch:
log.Infof("%s: conntrack event listen stop", probeName)
return nil
case err = <-errCh:
log.Infof("%s: conntrack event listen stop, err: %v", probeName, err)
return err
case event := <-evCh:
p.sink <- vanishEvent(event, nsinum)
log.Infof("%s: conntrack event listen got event: %s", probeName, event.String())
}
}
}
var eventTypeMapping = map[uint8]probe.EventType{
uint8(conntrack.EventNew): ConntrackNew,
uint8(conntrack.EventUpdate): ConntrackUpdate,
uint8(conntrack.EventDestroy): ConntrackDestroy,
uint8(conntrack.EventExpNew): ConntrackExpNew,
uint8(conntrack.EventExpDestroy): ConntrackExpDestroy,
uint8(conntrack.EventUnknown): ConntrackUnknow,
}
func vanishEvent(evt conntrack.Event, nsinum int) *probe.Event {
rawStr := fmt.Sprintf("Proto = %s Replied = %t ", bpfutil.GetProtoStr(evt.Flow.TupleOrig.Proto.Protocol), evt.Flow.Status.SeenReply())
if evt.Flow.TupleOrig.Proto.Protocol == 6 && evt.Flow.ProtoInfo.TCP != nil {
rawStr += fmt.Sprintf("State = %s ", bpfutil.GetTCPState(evt.Flow.ProtoInfo.TCP.State))
}
rawStr += fmt.Sprintf("Src = %s, Dst = %s", net.JoinHostPort(evt.Flow.TupleOrig.IP.SourceAddress.String(), strconv.Itoa(int(evt.Flow.TupleOrig.Proto.SourcePort))),
net.JoinHostPort(evt.Flow.TupleOrig.IP.DestinationAddress.String(), strconv.Itoa(int(evt.Flow.TupleOrig.Proto.DestinationPort))))
return &probe.Event{
Timestamp: time.Now().UnixNano(),
Type: eventTypeMapping[uint8(evt.Type)],
Labels: probe.EventMetaByNetNS(nsinum),
Message: rawStr,
}
}
func init() {
probe.MustRegisterEventProbe(probeName, eventProbeCreator)
}