pkg/exporter/probe/flow/flow.go (429 lines of code) (raw):

package flow import ( "context" "fmt" "strings" "sync" "syscall" "github.com/prometheus/client_golang/prometheus" "github.com/alibaba/kubeskoop/pkg/exporter/probe" "github.com/alibaba/kubeskoop/pkg/exporter/bpfutil" "github.com/cilium/ebpf" "github.com/cilium/ebpf/rlimit" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" ) //go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target=${GOARCH} -cc clang -cflags $BPF_CFLAGS bpf ../../../../bpf/flow.c -- -I../../../../bpf/headers -D__TARGET_ARCH_${GOARCH} type direction int const ( ingress direction = 0 egress direction = 1 metricsBytes = "bytes" metricsPackets = "packets" ClsactQdisc = "clsact" featureSwitchEnableFlowPort = 0 ) var ( probeName = "flow" ) func init() { probe.MustRegisterMetricsProbe(probeName, metricsProbeCreator) } type flowArgs struct { Dev string `mapstructure:"interfaceName"` EnablePortInLabel bool `mapstructure:"enablePortInLabel"` } func getDefaultRouteDevice() (netlink.Link, error) { filter := &netlink.Route{ Dst: nil, } routers, err := netlink.RouteListFiltered(syscall.AF_INET, filter, netlink.RT_FILTER_DST) if err != nil { return nil, err } if len(routers) == 0 { return nil, fmt.Errorf("no default route found") } if len(routers) > 1 { return nil, fmt.Errorf("multi default route found") } link, err := netlink.LinkByIndex(routers[0].LinkIndex) if err != nil { return nil, err } return link, nil } type linkFlowHelper interface { start() error stop() error } type dynamicLinkFlowHelper struct { bpfObjs *bpfObjects pattern string done chan struct{} flows map[int]*ebpfFlow lock sync.Mutex } func (h *dynamicLinkFlowHelper) tryStartLinkFlow(link netlink.Link) { log.Infof("flow: try start flow on nic %s, index %d", link.Attrs().Name, link.Attrs().Index) if _, ok := h.flows[link.Attrs().Index]; ok { log.Warnf("new interface(%s) index %d already exists, skip process", link.Attrs().Name, link.Attrs().Index) return } flow := &ebpfFlow{ dev: link, bpfObjs: h.bpfObjs, } if err := flow.start(); err != nil { log.Errorf("failed start flow on dev %s", link.Attrs().Name) return } h.flows[link.Attrs().Index] = flow } func (h *dynamicLinkFlowHelper) tryStopLinkFlow(name string, index int) { log.Infof("flow: try stop flow on nic %s, index %d", name, index) flow, ok := h.flows[index] if !ok { log.Warnf("deleted interface index %d not exists, skip process", index) return } _ = flow.stop() delete(h.flows, index) } func (h *dynamicLinkFlowHelper) start() error { h.done = make(chan struct{}) ch := make(chan netlink.LinkUpdate) links, err := netlink.LinkList() if err != nil { return fmt.Errorf("%s error list link, err: %w", probeName, err) } for _, link := range links { if !strings.HasPrefix(link.Attrs().Name, h.pattern) { continue } h.tryStartLinkFlow(link) } go func() { if err := netlink.LinkSubscribe(ch, h.done); err != nil { log.Errorf("%s error watch link change, err: %v", probeName, err) close(h.done) } }() go func() { h.lock.Lock() defer h.lock.Unlock() for { select { case change := <-ch: if !strings.HasSuffix(change.Attrs().Name, h.pattern) { break } switch change.Header.Type { case syscall.RTM_NEWLINK: link, err := netlink.LinkByIndex(int(change.Index)) if err != nil { log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err) break } h.tryStartLinkFlow(link) case syscall.RTM_DELLINK: h.tryStopLinkFlow(change.Attrs().Name, int(change.Index)) } case <-h.done: return } } }() return nil } func (h *dynamicLinkFlowHelper) stop() error { close(h.done) h.lock.Lock() defer h.lock.Unlock() var first error for _, flow := range h.flows { if err := flow.stop(); err != nil { if first == nil { first = err } } } return first } func metricsProbeCreator(args flowArgs) (probe.MetricsProbe, error) { p := &metricsProbe{ enablePort: args.EnablePortInLabel, } if args.Dev == "" { log.Infof("flow: auto detect network device with default route") dev, err := getDefaultRouteDevice() if err != nil { return nil, fmt.Errorf("fail detect default route dev, err: %w", err) } log.Infof("flow: default network device %s", dev.Attrs().Name) p.helper = &ebpfFlow{ dev: dev, bpfObjs: &p.bpfObjs, } } else { pattern := strings.TrimSuffix(args.Dev, "*") if pattern != args.Dev { log.Infof("flow: network device pattern %s", pattern) p.helper = &dynamicLinkFlowHelper{ bpfObjs: &p.bpfObjs, pattern: pattern, done: make(chan struct{}), flows: make(map[int]*ebpfFlow), } } else { link, err := netlink.LinkByName(pattern) if err != nil { return nil, fmt.Errorf("cannot get network interface by name %s, err: %w", pattern, err) } log.Infof("flow: network device %s", pattern) p.helper = &ebpfFlow{ bpfObjs: &p.bpfObjs, dev: link, } } } opts := probe.BatchMetricsOpts{ Namespace: probe.MetricsNamespace, Subsystem: probeName, VariableLabels: probe.TupleMetricsLabels, SingleMetricsOpts: []probe.SingleMetricsOpts{ {Name: metricsBytes, ValueType: prometheus.CounterValue}, {Name: metricsPackets, ValueType: prometheus.CounterValue}, }, } batchMetrics := probe.NewBatchMetrics(opts, p.collectOnce) return probe.NewMetricsProbe(probeName, p, batchMetrics), nil } type metricsProbe struct { enablePort bool bpfObjs bpfObjects helper linkFlowHelper } func (p *metricsProbe) Start(_ context.Context) error { if err := p.loadBPF(); err != nil { var verifierError *ebpf.VerifierError log.Error("failed load ebpf program", err) if errors.As(err, &verifierError) { log.Warn("detail", strings.Join(verifierError.Log, "\n")) } return err } return p.helper.start() } func (p *metricsProbe) Stop(_ context.Context) error { if err := p.helper.stop(); err != nil { return err } return p.bpfObjs.Close() } func toProbeTuple(t *bpfFlowTuple4) *probe.Tuple { return &probe.Tuple{ Protocol: t.Proto, Src: bpfutil.GetV4AddrStr(t.Src), Dst: bpfutil.GetV4AddrStr(t.Dst), Sport: t.Sport, Dport: t.Dport, } } func (p *metricsProbe) collectOnce(emit probe.Emit) error { var values []bpfFlowMetrics var key bpfFlowTuple4 iterator := p.bpfObjs.bpfMaps.InspFlow4Metrics.Iterate() for iterator.Next(&key, &values) { if err := iterator.Err(); err != nil { return fmt.Errorf("failed read bpfmap, err: %w", err) } var val bpfFlowMetrics for i := 0; i < len(values); i++ { val.Bytes += values[i].Bytes val.Packets += values[i].Packets } tuple := toProbeTuple(&key) labels := probe.BuildTupleMetricsLabels(tuple) emit("bytes", labels, float64(val.Bytes)) emit("packets", labels, float64(val.Packets)) } return nil } func (p *metricsProbe) loadBPF() error { if err := rlimit.RemoveMemlock(); err != nil { return fmt.Errorf("remove limit failed: %s", err.Error()) } opts := ebpf.CollectionOptions{ Programs: ebpf.ProgramOptions{ KernelTypes: bpfutil.LoadBTFSpecOrNil(), }, } if err := loadBpfObjects(&p.bpfObjs, &opts); err != nil { return fmt.Errorf("loading objects: %s", err.Error()) } if p.enablePort { if err := bpfutil.UpdateFeatureSwitch(p.bpfObjs.InspFlowFeatureSwitch, featureSwitchEnableFlowPort, 1); err != nil { return fmt.Errorf("failed set flow feature switch: %w", err) } } return nil } type ebpfFlow struct { cleanQdisc bool dev netlink.Link bpfObjs *bpfObjects } func (f *ebpfFlow) start() error { err := f.attachBPF() if err != nil { log.Errorf("%s failed attach ebpf to dev %s, cleanup", probeName, f.dev) f.cleanup() } return err } func (f *ebpfFlow) stop() error { f.cleanup() return nil } func (f *ebpfFlow) cleanup() { clean := func(dir direction) { filter, err := f.getFlowFilter(dir) if err != nil { log.Errorf("%s cannot list ingress filter for dev %s: %v", probeName, f.dev.Attrs().Name, err) return } if err := netlink.FilterDel(filter); err != nil { log.Errorf("%s cannot delete ingress filter for dev %s: %v", probeName, f.dev.Attrs().Name, err) } } clean(ingress) clean(egress) if f.cleanQdisc { _ = netlink.QdiscDel(clsact(f.dev)) } } func directionName(dir direction) string { switch dir { case ingress: return "ingress" case egress: return "egress" } return "" } func filterParent(dir direction) uint32 { switch dir { case ingress: return netlink.HANDLE_MIN_INGRESS case egress: return netlink.HANDLE_MIN_EGRESS } return 0 } func filterName(dev string, dir direction) string { directionName := directionName(dir) return fmt.Sprintf("kubeskoop-flow-%s-%s", dev, directionName) } func (f *ebpfFlow) getFlowFilter(direction direction) (*netlink.BpfFilter, error) { filterParent := filterParent(direction) filterName := filterName(f.dev.Attrs().Name, direction) filters, err := netlink.FilterList(f.dev, filterParent) if err != nil { return nil, fmt.Errorf("failed list filters: %w", err) } for _, filter := range filters { if filter.Type() != "bpf" { continue } f, ok := filter.(*netlink.BpfFilter) if !ok { continue } if f.Name != filterName { continue } return f, nil } return nil, nil } func (f *ebpfFlow) setupTCFilter(link netlink.Link) error { var ( err error ) if f.cleanQdisc, err = ensureClsactQdisc(link); err != nil { return fmt.Errorf("failed replace qdics clsact for dev %s: %w", link.Attrs().Name, err) } setup := func(dir direction) error { filterParent := filterParent(dir) filterName := filterName(f.dev.Attrs().Name, dir) directionName := directionName(dir) var prog *ebpf.Program switch dir { case ingress: prog = f.bpfObjs.bpfPrograms.TcIngress case egress: prog = f.bpfObjs.bpfPrograms.TcEgress } filter := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: filterParent, Protocol: unix.ETH_P_IP, Priority: 0xffff, }, Fd: prog.FD(), Name: filterName, DirectAction: true, } oldFilter, err := f.getFlowFilter(dir) if err != nil { return fmt.Errorf("failed list %s filter for dev %s: %w", directionName, link.Attrs().Name, err) } if oldFilter != nil { filter.Handle = oldFilter.Handle } if err := netlink.FilterReplace(filter); err != nil { return fmt.Errorf("failed replace %s filter for dev %s: %w", directionName, link.Attrs().Name, err) } return nil } if err = setup(ingress); err != nil { return err } if err = setup(egress); err != nil { return err } return nil } func (f *ebpfFlow) attachBPF() error { if err := f.setupTCFilter(f.dev); err != nil { return fmt.Errorf("failed replace %s qdisc with clsact, err: %v", f.dev, err) } return nil } func clsact(link netlink.Link) netlink.Qdisc { attrs := netlink.QdiscAttrs{ LinkIndex: link.Attrs().Index, Handle: netlink.MakeHandle(0xffff, 0), Parent: netlink.HANDLE_CLSACT, } return &netlink.GenericQdisc{ QdiscAttrs: attrs, QdiscType: ClsactQdisc, } } func ensureClsactQdisc(link netlink.Link) (bool, error) { qdicsList, err := netlink.QdiscList(link) if err != nil { return false, err } for _, q := range qdicsList { if q.Attrs().Parent == netlink.HANDLE_CLSACT && q.Type() == ClsactQdisc { log.Infof("got a old clsact, not create") return false, nil } } return true, netlink.QdiscAdd(clsact(link)) }