pkg/accesslog/collector/connection.go (262 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package collector import ( "context" "encoding/binary" "fmt" "net" "os" "github.com/docker/go-units" "github.com/sirupsen/logrus" "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/accesslog/forwarder" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/tools" "github.com/apache/skywalking-rover/pkg/tools/btf" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/apache/skywalking-rover/pkg/tools/ip" "github.com/cilium/ebpf" "github.com/cilium/ebpf/link" "golang.org/x/sys/unix" ) var connectionLogger = logger.GetLogger("access_log", "collector", "connection") var connectionCollectInstance = NewConnectionCollector() type ConnectCollector struct { eventQueue *btf.EventQueue } func NewConnectionCollector() *ConnectCollector { return &ConnectCollector{} } func (c *ConnectCollector) Start(m *module.Manager, ctx *common.AccessLogContext) error { perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ConnectionAnalyze.PerCPUBufferSize) if err != nil { return err } if int(perCPUBufferSize) < os.Getpagesize() { return fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize()) } if ctx.Config.ConnectionAnalyze.ParseParallels < 1 { return fmt.Errorf("the parallels cannot be small than 1") } if ctx.Config.ConnectionAnalyze.AnalyzeParallels < 1 { return fmt.Errorf("the parallels cannot be small than 1") } if ctx.Config.ConnectionAnalyze.QueueSize < 1 { return fmt.Errorf("the queue size be small than 1") } c.eventQueue = btf.NewEventQueue("connection resolver", ctx.Config.ConnectionAnalyze.AnalyzeParallels, ctx.Config.ConnectionAnalyze.QueueSize, func(num int) btf.PartitionContext { return NewConnectionPartitionContext(ctx, m.FindModule(process.ModuleName).(process.K8sOperator)) }) c.eventQueue.RegisterReceiver(ctx.BPF.SocketConnectionEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketConnectEvent{} }, func(data interface{}) int { return int(data.(*events.SocketConnectEvent).ConID) }) c.eventQueue.RegisterReceiver(ctx.BPF.SocketCloseEventQueue, int(perCPUBufferSize), ctx.Config.ConnectionAnalyze.ParseParallels, func() interface{} { return &events.SocketCloseEvent{} }, func(data interface{}) int { return int(data.(*events.SocketCloseEvent).ConnectionID) }) c.eventQueue.Start(ctx.RuntimeContext, ctx.BPF.Linker) ctx.BPF.AddTracePoint("syscalls", "sys_enter_connect", ctx.BPF.TracepointEnterConnect) ctx.BPF.AddTracePoint("syscalls", "sys_exit_connect", ctx.BPF.TracepointExitConnect) ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept", ctx.BPF.TracepointEnterAccept) ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept", ctx.BPF.TracepointExitAccept) ctx.BPF.AddTracePoint("syscalls", "sys_enter_accept4", ctx.BPF.TracepointEnterAccept) ctx.BPF.AddTracePoint("syscalls", "sys_exit_accept4", ctx.BPF.TracepointExitAccept) ctx.BPF.AddTracePoint("syscalls", "sys_enter_close", ctx.BPF.TracepointEnterClose) ctx.BPF.AddTracePoint("syscalls", "sys_exit_close", ctx.BPF.TracepointExitClose) ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{ "tcp_connect": ctx.BPF.TcpConnect, }) ctx.BPF.AddLink(link.Kretprobe, map[string]*ebpf.Program{ "sock_alloc": ctx.BPF.SockAllocRet, }) ctx.BPF.AddLink(link.Kprobe, map[string]*ebpf.Program{ "ip4_datagram_connect": ctx.BPF.Ip4UdpDatagramConnect, }) _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{ "__nf_conntrack_hash_insert": ctx.BPF.NfConntrackHashInsert, }) _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{ "nf_confirm": ctx.BPF.NfConfirm, }) _ = ctx.BPF.AddLinkOrError(link.Kprobe, map[string]*ebpf.Program{ "ctnetlink_fill_info": ctx.BPF.NfCtnetlinkFillInfo, }) return nil } func (c *ConnectCollector) Stop() { } type ConnectionPartitionContext struct { context *common.AccessLogContext k8sOperator process.K8sOperator } func NewConnectionPartitionContext(ctx *common.AccessLogContext, k8sOperator process.K8sOperator) *ConnectionPartitionContext { return &ConnectionPartitionContext{ context: ctx, k8sOperator: k8sOperator, } } func (c *ConnectionPartitionContext) Start(ctx context.Context) { } func (c *ConnectionPartitionContext) Consume(data interface{}) { switch event := data.(type) { case *events.SocketConnectEvent: connectionLogger.Debugf("receive connect event, connection ID: %d, randomID: %d, "+ "pid: %d, fd: %d, role: %s: func: %s, family: %d, success: %d, conntrack exist: %t", event.ConID, event.RandomID, event.PID, event.SocketFD, enums.ConnectionRole(event.Role), enums.SocketFunctionName(event.FuncName), event.SocketFamily, event.ConnectSuccess, event.ConnTrackUpstreamPort != 0) socketPair := c.BuildSocketFromConnectEvent(event) if socketPair == nil { connectionLogger.Debugf("cannot found the socket paire from connect event, connection ID: %d, randomID: %d", event.ConID, event.RandomID) return } connectionLogger.Debugf("build socket pair success, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d", event.ConID, event.RandomID, socketPair.Role, socketPair.SrcIP, socketPair.SrcPort, socketPair.DestIP, socketPair.DestPort) forwarder.SendConnectEvent(c.context, event, socketPair) case *events.SocketCloseEvent: connectionLogger.Debugf("receive close event, connection ID: %d, randomID: %d, pid: %d, fd: %d", event.ConnectionID, event.RandomID, event.PID, event.SocketFD) wapperedEvent := c.context.ConnectionMgr.OnConnectionClose(event) forwarder.SendCloseEvent(c.context, wapperedEvent) } } func (c *ConnectionPartitionContext) FixSocketFamilyIfNeed(event *events.SocketConnectEvent, result *ip.SocketPair) { if result == nil { return } if parseIP := net.ParseIP(result.SrcIP); parseIP != nil { var actual uint32 if parseIP.To4() != nil { actual = unix.AF_INET } else { actual = unix.AF_INET6 } if result.Family != actual { connectionLogger.Debugf("fix the socket family from %d to %d, connection ID: %d, randomID: %d", result.Family, actual, event.ConID, event.RandomID) result.Family = actual } } } func (c *ConnectionPartitionContext) BuildSocketFromConnectEvent(event *events.SocketConnectEvent) *ip.SocketPair { if event.SocketFamily != unix.AF_INET && event.SocketFamily != unix.AF_INET6 && event.SocketFamily != enums.SocketFamilyUnknown { // if not ipv4, ipv6 or unknown, ignore return nil } pair := c.BuildSocketPair(event) if pair != nil && pair.IsValid() { connectionLogger.Debugf("found the connection from the connect event is valid, connection ID: %d, randomID: %d", event.ConID, event.RandomID) return pair } // if only the local port not success, maybe the upstream port is not open, so it could be continued if c.IsOnlyLocalPortEmpty(pair) { event.ConnectSuccess = 0 connectionLogger.Debugf("the connection from the connect event is only the local port is empty, connection ID: %d, randomID: %d", event.ConID, event.RandomID) return pair } pair, err := ip.ParseSocket(event.PID, event.SocketFD) if err != nil { connectionLogger.Debugf("cannot found the socket, pid: %d, socket FD: %d, error: %v", event.PID, event.SocketFD, err) return nil } connectionLogger.Debugf("found the connection from the socket, connection ID: %d, randomID: %d", event.ConID, event.RandomID) pair.Role = enums.ConnectionRole(event.Role) c.FixSocketFamilyIfNeed(event, pair) c.CheckNeedConntrack(event, pair) return pair } func (c *ConnectionPartitionContext) IsOnlyLocalPortEmpty(socketPair *ip.SocketPair) bool { if socketPair == nil { return false } port := socketPair.SrcPort defer func() { socketPair.SrcPort = port }() socketPair.SrcPort = 1 return socketPair.IsValid() } func (c *ConnectionPartitionContext) BuildSocketPair(event *events.SocketConnectEvent) *ip.SocketPair { var result *ip.SocketPair haveConnTrack := false if event.SocketFamily == unix.AF_INET { result = &ip.SocketPair{ Family: uint32(event.SocketFamily), Role: enums.ConnectionRole(event.Role), SrcIP: ip.ParseIPV4(event.LocalAddrV4), SrcPort: uint16(event.LocalAddrPort), } if event.ConnTrackUpstreamIPl != 0 && event.ConnTrackUpstreamPort != 0 { haveConnTrack = true result.DestIP = ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl)) result.DestPort = uint16(event.ConnTrackUpstreamPort) if connectionLogger.Enable(logrus.DebugLevel) { connectionLogger.Debugf("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d", event.ConID, event.RandomID, ip.ParseIPV4(event.RemoteAddrV4), uint16(event.RemoteAddrPort), result.DestIP, result.DestPort) } } else { result.DestIP = ip.ParseIPV4(event.RemoteAddrV4) result.DestPort = uint16(event.RemoteAddrPort) } } else if event.SocketFamily == unix.AF_INET6 { result = &ip.SocketPair{ Family: uint32(event.SocketFamily), Role: enums.ConnectionRole(event.Role), SrcIP: ip.ParseIPV6(event.LocalAddrV6), SrcPort: uint16(event.LocalAddrPort), } if event.ConnTrackUpstreamIPl != 0 && event.ConnTrackUpstreamPort != 0 { haveConnTrack = true if event.ConnTrackUpstreamIPh != 0 { var ipv6 [16]uint8 binary.BigEndian.PutUint64(ipv6[0:8], event.ConnTrackUpstreamIPh) binary.BigEndian.PutUint64(ipv6[8:16], event.ConnTrackUpstreamIPl) result.DestIP = ip.ParseIPV6(ipv6) } else { result.DestIP = ip.ParseIPV4(uint32(event.ConnTrackUpstreamIPl)) } result.DestPort = uint16(event.ConnTrackUpstreamPort) if connectionLogger.Enable(logrus.DebugLevel) { connectionLogger.Debugf("found the connection from the conntrack, connection ID: %d, randomID: %d, original: %s:%d, conntrack: %s:%d", event.ConID, event.RandomID, ip.ParseIPV6(event.RemoteAddrV6), uint16(event.RemoteAddrPort), result.DestIP, result.DestPort) } } else { result.DestIP = ip.ParseIPV6(event.RemoteAddrV6) result.DestPort = uint16(event.RemoteAddrPort) } } if haveConnTrack { return result } c.FixSocketFamilyIfNeed(event, result) c.CheckNeedConntrack(event, result) return result } func (c *ConnectionPartitionContext) CheckNeedConntrack(event *events.SocketConnectEvent, socket *ip.SocketPair) { if socket == nil || !socket.IsValid() || tools.IsLocalHostAddress(socket.DestIP) || event.FuncName == enums.SocketFunctionNameAccept || // accept event don't need to update the remote address !c.context.ConnectionMgr.ProcessIsDetectBy(event.PID, api.Kubernetes) { // only the k8s process need to update the remote address from conntrack return } isPodIP, err := c.k8sOperator.IsPodIP(socket.DestIP) if err != nil { connectionLogger.Warnf("cannot found the pod IP, connection ID: %d, randomID: %d, error: %v", event.ConID, event.RandomID, err) } if isPodIP { connectionLogger.Debugf("detect the remote IP is pod IP, connection ID: %d, randomID: %d, remote: %s", event.ConID, event.RandomID, socket.DestIP) return } // update to the socket need to update the remote address from conntrack socket.NeedConnTrack = true }