pkg/accesslog/common/connection.go (547 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package common import ( "context" "errors" "fmt" "sync" "time" "github.com/sirupsen/logrus" "github.com/apache/skywalking-rover/pkg/accesslog/bpf" "github.com/apache/skywalking-rover/pkg/accesslog/events" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/process/finders/kubernetes" "github.com/apache/skywalking-rover/pkg/tools" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/apache/skywalking-rover/pkg/tools/host" "github.com/apache/skywalking-rover/pkg/tools/ip" "github.com/apache/skywalking-rover/pkg/tools/path" "github.com/cilium/ebpf" cmap "github.com/orcaman/concurrent-map" "k8s.io/apimachinery/pkg/util/cache" v32 "skywalking.apache.org/repo/goapi/collect/common/v3" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) const ( // clean the active connection in BPF interval cleanActiveConnectionInterval = time.Second * 20 // in case the reading the data from BPF queue is disordered, so add a delay time to delete the connection information connectionDeleteDelayTime = time.Second * 20 // the connection check exist time connectionCheckExistTime = time.Second * 30 ) type ConnectEventWithSocket struct { *events.SocketConnectEvent SocketPair *ip.SocketPair } type CloseEventWithNotify struct { *events.SocketCloseEvent } type ConnectionProcessFinishCallback func() type ConnectionProcessor interface { } type FlusherListener interface { // ReadyToFlushConnection notify which connection ready to flush ReadyToFlushConnection(connection *ConnectionInfo, getConnectionFromEvent events.Event) } type ProcessListener interface { OnNewProcessMonitoring(pid int32) OnProcessRemoved(pid int32) } type ConnectionManager struct { moduleMgr *module.Manager processOP process.Operator connections cmap.ConcurrentMap // localIPWithPid cache all local monitoring process bind IP address // for checking the remote address is local or not localIPWithPid map[string]int32 // monitoringProcesses management all monitoring processes monitoringProcesses map[int32][]api.ProcessInterface monitoringProcessLock sync.RWMutex // monitoring process map in BPF processMonitorMap *ebpf.Map activeConnectionMap *ebpf.Map monitorFilter MonitorFilter processors []ConnectionProcessor processListeners []ProcessListener flushListeners []FlusherListener connectTracker *ip.ConnTrack connectionProtocolBreakMap *cache.Expiring } func (c *ConnectionManager) RegisterProcessor(processor ConnectionProcessor) { c.processors = append(c.processors, processor) } func (c *ConnectionManager) AddProcessListener(listener ProcessListener) { c.processListeners = append(c.processListeners, listener) } func (c *ConnectionManager) RegisterNewFlushListener(listener FlusherListener) { c.flushListeners = append(c.flushListeners, listener) } type ConnectionInfo struct { ConnectionID uint64 RandomID uint64 RPCConnection *v3.AccessLogConnection MarkDeletable bool PID uint32 Socket *ip.SocketPair LastCheckExistTime time.Time DeleteAfter *time.Time ProtocolBreak bool } func NewConnectionManager(config *Config, moduleMgr *module.Manager, bpfLoader *bpf.Loader, filter MonitorFilter) *ConnectionManager { track, err := ip.NewConnTrack() if err != nil { log.Warnf("cannot create the connection tracker, %v", err) } mgr := &ConnectionManager{ moduleMgr: moduleMgr, processOP: moduleMgr.FindModule(process.ModuleName).(process.Operator), connections: cmap.New(), localIPWithPid: make(map[string]int32), monitoringProcesses: make(map[int32][]api.ProcessInterface), processMonitorMap: bpfLoader.ProcessMonitorControl, activeConnectionMap: bpfLoader.ActiveConnectionMap, monitorFilter: filter, flushListeners: make([]FlusherListener, 0), connectTracker: track, connectionProtocolBreakMap: cache.NewExpiring(), } return mgr } func (c *ConnectionManager) Start(ctx context.Context, accessLogContext *AccessLogContext) { c.processOP.AddListener(c) // starting to clean up the un-active connection in BPF go func() { ticker := time.NewTicker(cleanActiveConnectionInterval) for { select { case <-ticker.C: activeConnections := c.activeConnectionMap.Iterate() var conID uint64 var activateConn ActiveConnection for activeConnections.Next(&conID, &activateConn) { // if the connection is existed, then check the next one pid, fd := events.ParseConnectionID(conID) if c.checkProcessFDExist(pid, fd) { continue } // if the connection is not existed, then delete it if err := c.activeConnectionMap.Delete(conID); err != nil { if !errors.Is(err, ebpf.ErrKeyNotExist) { log.Warnf("failed to delete the active connection, pid: %d, fd: %d, connection ID: %d, random ID: %d, error: %v", pid, fd, conID, activateConn.RandomID, err) } continue } log.Debugf("deleted the active connection as not exist in file system, pid: %d, fd: %d, connection ID: %d, random ID: %d", pid, fd, conID, activateConn.RandomID) // building and send the close event wapperedEvent := c.OnConnectionClose(&events.SocketCloseEvent{ ConnectionID: conID, RandomID: activateConn.RandomID, StartTime: 0, EndTime: 0, PID: activateConn.PID, SocketFD: activateConn.SocketFD, Success: 0, }) accessLogContext.Queue.AppendKernelLog(NewKernelLogEvent(LogTypeClose, wapperedEvent)) } case <-ctx.Done(): return } } }() } func (c *ConnectionManager) checkProcessFDExist(pid, fd uint32) bool { return path.Exists(host.GetHostProcInHost(fmt.Sprintf("%d/fd/%d", pid, fd))) } func (c *ConnectionManager) Stop() { c.processOP.DeleteListener(c) } func (c *ConnectionManager) OnNewProcessExecuting(pid int32) { // if the process should not be monitoring, then delete in the map if !c.processOP.ShouldMonitor(pid) { c.updateMonitorStatusForProcess(pid, false) } } func (c *ConnectionManager) GetExcludeNamespaces() []string { return c.monitorFilter.ExcludeNamespaces() } func (c *ConnectionManager) Find(event events.Event) *ConnectionInfo { connectionKey := fmt.Sprintf("%d_%d", event.GetConnectionID(), event.GetRandomID()) data, exist := c.connections.Get(connectionKey) if exist { connection := data.(*ConnectionInfo) c.connectionPostHandle(connection, event) return connection } // is current is connected event, then getting the socket pair if e, socket := getSocketPairFromConnectEvent(event); e != nil && socket != nil { var localAddress, remoteAddress *v3.ConnectionAddress localPID, _ := events.ParseConnectionID(event.GetConnectionID()) localAddress = c.buildLocalAddress(localPID, socket.SrcPort, socket) remoteAddress = c.buildRemoteAddress(e, socket) if localAddress == nil || remoteAddress == nil { return nil } connection := c.buildConnection(e, socket, localAddress, remoteAddress, connectionKey) c.connections.Set(connectionKey, connection) if log.Enable(logrus.DebugLevel) { log.Debugf("building flushing connection, connection ID: %d, randomID: %d, role: %s, local: %s:%d, remote: %s:%d, "+ "local address: %s, remote address: %s, protocol: %s", e.GetConnectionID(), e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort, localAddress.String(), remoteAddress.String(), connection.RPCConnection.Protocol.String()) } c.connectionPostHandle(connection, event) return connection } return nil } func (c *ConnectionManager) buildRemoteAddress(e *events.SocketConnectEvent, socket *ip.SocketPair) *v3.ConnectionAddress { // if the remote address is local, then no needs to build the address(access log no need to send by communicate with self) if tools.IsLocalHostAddress(socket.DestIP) { return nil } // if the remote connection is need to use conntrack, then update the real peer address if socket.NeedConnTrack { if err := c.connectTracker.UpdateRealPeerAddress(socket); err != nil { log.Debugf("cannot update the real peer address, %v", err) } } // found local address with pid if pid, exist := c.localIPWithPid[socket.DestIP]; exist && pid != 0 { return c.buildLocalAddress(uint32(pid), socket.DestPort, socket) } log.Debugf("building the remote address to unknown, connection: %d-%d, role: %s, local: %s:%d, remote: %s:%d", e.GetConnectionID(), e.GetRandomID(), socket.Role, socket.SrcIP, socket.SrcPort, socket.DestIP, socket.DestPort) return c.buildAddressFromRemote(socket.DestIP, socket.DestPort) } func (c *ConnectionManager) connectionPostHandle(connection *ConnectionInfo, event events.Event) { if connection == nil { return } switch e := event.(type) { case *CloseEventWithNotify: connection.MarkDeletable = true case events.SocketDetail: tlsMode := connection.RPCConnection.TlsMode protocol := connection.RPCConnection.Protocol if e.GetSSL() == 1 && connection.RPCConnection.TlsMode == v3.AccessLogConnectionTLSMode_Plain { tlsMode = v3.AccessLogConnectionTLSMode_TLS } if !connection.ProtocolBreak && e.GetProtocol() != enums.ConnectionProtocolUnknown && connection.RPCConnection.Protocol == v3.AccessLogProtocolType_TCP { switch e.GetProtocol() { case enums.ConnectionProtocolHTTP: protocol = v3.AccessLogProtocolType_HTTP_1 case enums.ConnectionProtocolHTTP2: protocol = v3.AccessLogProtocolType_HTTP_2 } } if connection.ProtocolBreak && connection.RPCConnection.Protocol != v3.AccessLogProtocolType_TCP { protocol = v3.AccessLogProtocolType_TCP } c.rebuildRPCConnectionWithTLSModeAndProtocol(connection, tlsMode, protocol) } // notify all flush listeners the connection is ready to flush for _, flush := range c.flushListeners { flush.ReadyToFlushConnection(connection, event) } } // According to https://github.com/golang/protobuf/issues/1609 // if the message is modified during marshaling, it may cause the error when send the message to the backend // so, we need to clone the message and change it before sending it to the channel func (c *ConnectionManager) rebuildRPCConnectionWithTLSModeAndProtocol(connection *ConnectionInfo, tls v3.AccessLogConnectionTLSMode, protocol v3.AccessLogProtocolType) { original := connection.RPCConnection connection.RPCConnection = &v3.AccessLogConnection{ Local: original.Local, Remote: original.Remote, Role: original.Role, TlsMode: tls, Protocol: protocol, Attachment: original.Attachment, } } func (c *ConnectionManager) ProcessIsMonitor(pid uint32) bool { c.monitoringProcessLock.RLock() defer c.monitoringProcessLock.RUnlock() return len(c.monitoringProcesses[int32(pid)]) > 0 } func (c *ConnectionManager) ProcessIsDetectBy(pid uint32, detectType api.ProcessDetectType) bool { c.monitoringProcessLock.RLock() defer c.monitoringProcessLock.RUnlock() for _, p := range c.monitoringProcesses[int32(pid)] { if p.DetectType() == detectType { return true } } return false } func (c *ConnectionManager) buildConnection(event *events.SocketConnectEvent, socket *ip.SocketPair, local, remote *v3.ConnectionAddress, conKey string) *ConnectionInfo { var role v32.DetectPoint switch socket.Role { case enums.ConnectionRoleClient: role = v32.DetectPoint_client case enums.ConnectionRoleServer: role = v32.DetectPoint_server } connection := &v3.AccessLogConnection{ Local: local, Remote: remote, Role: role, TlsMode: v3.AccessLogConnectionTLSMode_Plain, Protocol: v3.AccessLogProtocolType_TCP, } val, exist := c.connectionProtocolBreakMap.Get(conKey) protocolBreak := false if exist { protocolBreak = val.(bool) c.connectionProtocolBreakMap.Delete(conKey) } return &ConnectionInfo{ ConnectionID: event.ConID, RandomID: event.RandomID, RPCConnection: connection, PID: event.PID, Socket: socket, LastCheckExistTime: time.Now(), ProtocolBreak: protocolBreak, } } func (c *ConnectionManager) buildLocalAddress(pid uint32, port uint16, socket *ip.SocketPair) *v3.ConnectionAddress { c.monitoringProcessLock.RLock() defer c.monitoringProcessLock.RUnlock() for _, pi := range c.monitoringProcesses[int32(pid)] { if pi.DetectType() == api.Kubernetes { entity := pi.Entity() podContainer := pi.DetectProcess().(*kubernetes.Process).PodContainer() return &v3.ConnectionAddress{ Address: &v3.ConnectionAddress_Kubernetes{ Kubernetes: &v3.KubernetesProcessAddress{ ServiceName: entity.ServiceName, PodName: podContainer.Pod.Name, ContainerName: podContainer.ContainerSpec.Name, ProcessName: entity.ProcessName, Port: int32(port), }, }, } } } return &v3.ConnectionAddress{ Address: &v3.ConnectionAddress_Ip{ Ip: &v3.IPAddress{ Host: socket.SrcIP, Port: int32(port), }, }, } } func (c *ConnectionManager) buildAddressFromRemote(ipHost string, port uint16) *v3.ConnectionAddress { return &v3.ConnectionAddress{ Address: &v3.ConnectionAddress_Ip{ Ip: &v3.IPAddress{ Host: ipHost, Port: int32(port), }, }, } } func (c *ConnectionManager) OnConnectionClose(event *events.SocketCloseEvent) *CloseEventWithNotify { return &CloseEventWithNotify{ SocketCloseEvent: event, } } func (c *ConnectionManager) AddNewProcess(pid int32, entities []api.ProcessInterface) { // filtering the namespace monitorProcesses := c.shouldMonitorProcesses(entities) if len(monitorProcesses) == 0 { c.RemoveProcess(pid, entities) return } c.monitoringProcessLock.Lock() defer c.monitoringProcessLock.Unlock() // adding monitoring process and IP addresses var entity *api.ProcessEntity if len(entities) > 0 { entity = entities[0].Entity() } log.Infof("adding monitoring process, pid: %d, entity: %v", pid, entity) if _, ok := c.monitoringProcesses[pid]; ok { log.Infof("the process %d already monitoring, so no needs to add again", pid) return } c.monitoringProcesses[pid] = monitorProcesses c.updateMonitorStatusForProcess(pid, true) for _, entity := range monitorProcesses { for _, host := range entity.ExposeHosts() { c.localIPWithPid[host] = pid } } c.printTotalAddressesWithPid("adding monitoring process") for _, l := range c.processListeners { l.OnNewProcessMonitoring(pid) } } func (c *ConnectionManager) rebuildLocalIPWithPID() { result := make(map[string]int32) for pid, entities := range c.monitoringProcesses { for _, entity := range entities { for _, host := range entity.ExposeHosts() { result[host] = pid } } } c.localIPWithPid = result } func (c *ConnectionManager) printTotalAddressesWithPid(prefix string) { if !log.Enable(logrus.DebugLevel) { return } log.Debugf("%s, print all local address with pid", prefix) log.Debugf("----------------------------") log.Debugf("total local address with pid: %d", len(c.localIPWithPid)) for k, v := range c.localIPWithPid { log.Debugf("local address: %s, pid: %d", k, v) } log.Debugf("----------------------------") } func (c *ConnectionManager) shouldMonitorProcesses(entities []api.ProcessInterface) []api.ProcessInterface { return c.monitorFilter.ShouldIncludeProcesses(entities) } func (c *ConnectionManager) checkConnectionIsExist(con *ConnectionInfo) bool { // skip the check if the check time is not reach if time.Since(con.LastCheckExistTime) < connectionCheckExistTime { return true } con.LastCheckExistTime = time.Now() var activateConn ActiveConnection if err := c.activeConnectionMap.Lookup(con.ConnectionID, &activateConn); err != nil { if errors.Is(err, ebpf.ErrKeyNotExist) { con.MarkDeletable = true return false } log.Warnf("cannot found the active connection: %d-%d, err: %v", con.ConnectionID, con.RandomID, err) return false } else if activateConn.RandomID != 0 && activateConn.RandomID != con.RandomID { log.Debugf("detect the connection: %d-%d is already closed(by difference random ID), so remove from the connection manager", con.ConnectionID, con.RandomID) con.MarkDeletable = true return false } return true } func (c *ConnectionManager) RemoveProcess(pid int32, entities []api.ProcessInterface) { c.monitoringProcessLock.Lock() defer c.monitoringProcessLock.Unlock() // delete monitoring process and IP addresses delete(c.monitoringProcesses, pid) c.updateMonitorStatusForProcess(pid, false) c.rebuildLocalIPWithPID() c.printTotalAddressesWithPid("remove monitoring process") for _, l := range c.processListeners { l.OnProcessRemoved(pid) } } func (c *ConnectionManager) RecheckAllProcesses(processes map[int32][]api.ProcessInterface) { shouldMonitoringProcesses := make(map[int32][]api.ProcessInterface) for pid, p := range processes { monitorProcesses := c.shouldMonitorProcesses(p) if len(monitorProcesses) == 0 { continue } shouldMonitoringProcesses[pid] = monitorProcesses } // checking the monitoring process c.monitoringProcesses = shouldMonitoringProcesses // for-each the existing monitored map, it should not be monitored, then remote it iterate := c.processMonitorMap.Iterate() processInBPF := make(map[int32]bool) var pid uint32 var monitor uint32 for iterate.Next(&pid, &monitor) { processInBPF[int32(pid)] = true } c.monitoringProcessLock.RLock() defer c.monitoringProcessLock.RUnlock() // make sure BPF and user space are consistent for pid := range processInBPF { if _, ok := c.monitoringProcesses[pid]; !ok { c.updateMonitorStatusForProcess(pid, false) for _, l := range c.processListeners { l.OnProcessRemoved(pid) } } } for pid := range c.monitoringProcesses { if _, ok := processInBPF[pid]; !ok { c.updateMonitorStatusForProcess(pid, true) for _, l := range c.processListeners { l.OnNewProcessMonitoring(pid) } } } // update all IP addresses c.rebuildLocalIPWithPID() } func (c *ConnectionManager) updateMonitorStatusForProcess(pid int32, monitor bool) { var err error if monitor { err = c.processMonitorMap.Update(pid, uint32(1), ebpf.UpdateAny) } else { err = c.processMonitorMap.Delete(pid) } if err != nil { if !monitor && errors.Is(err, ebpf.ErrKeyNotExist) { return } log.Warnf("failed to update the process %d monitor status to %t: %v", pid, monitor, err) } else { log.Debugf("update the process %d monitor status to %t", pid, monitor) } } // OnBuildConnectionLogFinished notify the connection log build finished func (c *ConnectionManager) OnBuildConnectionLogFinished() { // delete all connections which marked as deletable // all deletable connection events been sent deletableConnections := make(map[string]bool) now := time.Now() c.connections.IterCb(func(key string, v interface{}) { con, ok := v.(*ConnectionInfo) if !ok || con == nil { return } // already mark as deletable or process not monitoring shouldDelete := con.MarkDeletable || !c.ProcessIsMonitor(con.PID) if !shouldDelete { shouldDelete = !c.checkConnectionIsExist(con) } if shouldDelete && con.DeleteAfter == nil { deleteAfterTime := now.Add(connectionDeleteDelayTime) con.DeleteAfter = &deleteAfterTime log.Debugf("detected the connection has mark as deletable, so add a delay timer, connection ID: %d, random ID: %d", con.ConnectionID, con.RandomID) } if shouldDelete && now.After(*con.DeleteAfter) { deletableConnections[key] = true } }) for key := range deletableConnections { log.Debugf("deleting the connection in manager: %s", key) c.connections.Remove(key) } } func (c *ConnectionManager) SkipAllDataAnalyzeAndDowngradeProtocol(conID, ranID uint64) { // setting connection protocol is break connectionKey := fmt.Sprintf("%d_%d", conID, ranID) data, exist := c.connections.Get(connectionKey) if exist { connection := data.(*ConnectionInfo) connection.ProtocolBreak = true } else { // setting to the protocol break map for encase the runner not starting building logs c.connectionProtocolBreakMap.Set(connectionKey, true, time.Minute) } // setting the connection skip data upload var activateConn ActiveConnection if err := c.activeConnectionMap.Lookup(conID, &activateConn); err != nil { if errors.Is(err, ebpf.ErrKeyNotExist) { return } log.Warnf("cannot found the active connection: %d-%d, err: %v", conID, ranID, err) return } if activateConn.RandomID != ranID { // make sure the connection is the same return } activateConn.SkipDataUpload = 1 if err := c.activeConnectionMap.Update(conID, activateConn, ebpf.UpdateAny); err != nil { log.Warnf("failed to update the active connection: %d-%d", conID, ranID) } } func getSocketPairFromConnectEvent(event events.Event) (*events.SocketConnectEvent, *ip.SocketPair) { if e, ok := event.(*ConnectEventWithSocket); ok { return e.SocketConnectEvent, e.SocketPair } return nil, nil } type ActiveConnection struct { RandomID uint64 PID uint32 SocketFD uint32 Role uint32 SocketFamily uint32 Protocol uint8 SSL uint8 SkipDataUpload uint8 // PAD for make sure it have same size when marshal data to the BPF PAD0 uint8 PAD1 uint32 }