pkg/profiling/task/network/analyze/base/context.go (319 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package base import ( "context" "encoding/json" "errors" "fmt" "net" "sync" "unsafe" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process/api" "github.com/apache/skywalking-rover/pkg/profiling/task/base" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events" "github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf" "github.com/cilium/ebpf" "github.com/sirupsen/logrus" cmap "github.com/orcaman/concurrent-map" "golang.org/x/sys/unix" ) type AnalyzerContext struct { // listening process map processes map[int32][]api.ProcessInterface // connection handler activeConnections cmap.ConcurrentMap // current activeConnections connections closedConnections []*ConnectionContext // closed connections' flushClosedEvents chan *events.SocketCloseEvent // connection have been closed, it is a queue to cache unknown active connections sockParseQueue chan *ConnectionContext // socket address parse queue // analyze listener list listeners []AnalyzeListener // close connection modify locker closedConnectionLocker sync.RWMutex } func NewAnalyzerContext(processes map[int32][]api.ProcessInterface) *AnalyzerContext { return &AnalyzerContext{ processes: processes, activeConnections: cmap.New(), closedConnections: make([]*ConnectionContext, 0), flushClosedEvents: make(chan *events.SocketCloseEvent, 5000), sockParseQueue: make(chan *ConnectionContext, 5000), listeners: make([]AnalyzeListener, 0), } } func (c *AnalyzerContext) Init(config *base.TaskConfig, moduleManager *module.Manager) error { for _, l := range c.listeners { if err := l.Init(config, moduleManager); err != nil { return err } } return nil } func (c *AnalyzerContext) AddListener(l AnalyzeListener) { c.listeners = append(c.listeners, l) } func (c *AnalyzerContext) GetAllConnectionWithContext() []*ConnectionContext { result := make([]*ConnectionContext, 0) result = append(result, c.flushClosedConnection()...) for _, con := range c.activeConnections.Items() { result = append(result, con.(*ConnectionContext)) } return result } func (c *AnalyzerContext) RegisterAllHandlers(ctx context.Context, bpfLoader *bpf.Loader) { // socket connect bpfLoader.ReadEventAsync(bpfLoader.SocketConnectionEventQueue, c.handleSocketConnectEvent, func() interface{} { return &events.SocketConnectEvent{} }) // socket close bpfLoader.ReadEventAsync(bpfLoader.SocketCloseEventQueue, c.handleSocketCloseEvent, func() interface{} { return &events.SocketCloseEvent{} }) for _, l := range c.listeners { l.RegisterBPFEvents(ctx, bpfLoader) } } func (c *AnalyzerContext) StartSocketAddressParser(ctx context.Context) { for i := 0; i < 2; i++ { go c.handleSocketParseQueue(ctx) } } func (c *AnalyzerContext) GetActiveConnection(conID, randomID uint64) *ConnectionContext { data, ok := c.activeConnections.Get(c.generateConnectionKey(conID, randomID)) if !ok { return nil } return data.(*ConnectionContext) } func (c *AnalyzerContext) UpdateExtensionConfig(config *base.ExtensionConfig) { for _, l := range c.listeners { l.UpdateExtensionConfig(config) } } func (c *AnalyzerContext) handleSocketParseQueue(ctx context.Context) { for { select { case cc := <-c.sockParseQueue: socket, err := ParseSocket(cc.LocalPid, cc.SocketFD) if err != nil { // if the remote port of connection is empty, then this connection not available basically if cc.RemotePort == 0 { log.Warnf("complete the socket error, pid: %d, fd: %d, error: %v", cc.LocalPid, cc.SocketFD, err) } continue } cc.LocalIP = socket.SrcIP cc.LocalPort = socket.SrcPort cc.RemoteIP = socket.DestIP cc.RemotePort = socket.DestPort case <-ctx.Done(): return } } } func (c *AnalyzerContext) handleSocketConnectEvent(data interface{}) { event := data.(*events.SocketConnectEvent) if log.Enable(logrus.DebugLevel) { marshal, _ := json.Marshal(event) log.Debugf("found connect event, json: %s", string(marshal)) } processes := c.processes[int32(event.Pid)] if len(processes) == 0 { log.Warnf("get process connect event, but this process is don't need to monitor, pid: %d", event.Pid) return } // build active connection information con := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.FD, processes, false) con.Role = event.Role if event.NeedComplete == 0 { con.RemotePort = uint16(event.RemoteAddrPort) con.LocalPort = uint16(event.LocalAddrPort) if event.SocketFamily == unix.AF_INET { con.LocalIP = parseAddressV4(event.LocalAddrV4) con.RemoteIP = parseAddressV4(event.RemoteAddrV4) } else { con.LocalIP = parseAddressV6(event.LocalAddrV6) con.RemoteIP = parseAddressV6(event.RemoteAddrV6) } } else { // if the remote address exists then setting it if event.RemoteAddrPort != 0 { con.RemotePort = uint16(event.RemoteAddrPort) if event.SocketFamily == unix.AF_INET { con.RemoteIP = parseAddressV4(event.RemoteAddrV4) } else { con.RemoteIP = parseAddressV6(event.RemoteAddrV6) } } c.sockParseQueue <- con } // notify the listeners for _, l := range c.listeners { l.ReceiveNewConnection(con, event) } // add to the context c.saveActiveConnection(con) } func (c *AnalyzerContext) handleSocketCloseEvent(data interface{}) { event := data.(*events.SocketCloseEvent) if log.Enable(logrus.DebugLevel) { marshal, _ := json.Marshal(event) log.Debugf("found close event: %s", string(marshal)) } // try to handle the socket close event if !c.socketClosedEvent0(event) { // is not in active connection, maybe it's not have been added to activate first // just add to the close queue, wait for the flush connection with interval c.flushClosedEvents <- event return } } func (c *AnalyzerContext) FlushAllMetrics(bpfLoader *bpf.Loader, metricsPrefix string) (*MetricsBuilder, error) { metricsBuilder := NewMetricsBuilder(metricsPrefix) err := c.flushMetrics0(bpfLoader, metricsBuilder) if err != nil { return nil, err } return metricsBuilder, nil } func (c *AnalyzerContext) flushMetrics0(bpfLoader *bpf.Loader, builder *MetricsBuilder) error { // handling the unfinished close event c.processCachedCloseEvents() // get all connections ccs := c.GetAllConnectionWithContext() if len(ccs) == 0 { return nil } // prepare to flush metrics err := c.prepareToFlushMetrics(ccs, bpfLoader) if err != nil { return fmt.Errorf("prepare to flush the connection metrics failure: %v", err) } // combine all connections analyzer := c.NewTrafficAnalyzer() traffics := analyzer.CombineConnectionToTraffics(ccs) // generate connections for _, l := range c.listeners { l.FlushMetrics(traffics, builder) } // after flush metrics for _, l := range c.listeners { l.PostFlushConnectionMetrics(ccs) } return nil } func (c *AnalyzerContext) prepareToFlushMetrics(ccs []*ConnectionContext, bpfLoader *bpf.Loader) error { var active *ActiveConnectionInBPF closedConnections := make([]string, 0) connectionWithBPFList := make([]*ConnectionWithBPF, 0) for _, cc := range ccs { active, closedConnections = c.lookupTheActiveConnectionInBPf(cc, bpfLoader, closedConnections) connectionWithBPFList = append(connectionWithBPFList, &ConnectionWithBPF{ Connection: cc, ActiveInBPF: active, }) } // delete closed connections if len(closedConnections) > 0 { c.deleteConnectionOnly(closedConnections) } // call the listeners for _, l := range c.listeners { err := l.PreFlushConnectionMetrics(connectionWithBPFList, bpfLoader) if err != nil { return err } } return nil } func (c *AnalyzerContext) lookupTheActiveConnectionInBPf(connection *ConnectionContext, bpfLoader *bpf.Loader, closedConnections []string) (active *ActiveConnectionInBPF, closedRef []string) { var activeConnection ActiveConnectionInBPF // if connection not closed, then load the basic stats from bpf map if !connection.ConnectionClosed { err := bpfLoader.ActiveConnectionMap.Lookup(connection.ConnectionID, &activeConnection) if err != nil { if errors.Is(err, ebpf.ErrKeyNotExist) { closedConnections = append(closedConnections, c.generateConnectionKey(connection.ConnectionID, connection.RandomID)) connection.ConnectionClosed = true } else { log.Warnf("lookup the active connection error, connection id: %d, error: %v", connection.ConnectionID, err) } return nil, closedConnections } if log.Enable(logrus.DebugLevel) { marshal, _ := json.Marshal(activeConnection) log.Debugf("found the active connection, conid: %d, data: %s", connection.ConnectionID, string(marshal)) } if connection.Role == events.ConnectionRoleUnknown && activeConnection.Role != events.ConnectionRoleUnknown { connection.Role = activeConnection.Role } if connection.Protocol == events.ConnectionProtocolUnknown && activeConnection.Protocol != events.ConnectionProtocolUnknown { connection.Protocol = activeConnection.Protocol } if !connection.IsSSL && activeConnection.IsSSL == 1 { connection.IsSSL = true } return &activeConnection, closedConnections } return nil, closedConnections } func (c *AnalyzerContext) deleteConnectionOnly(ccs []string) { for _, cc := range ccs { c.activeConnections.Remove(cc) } } func (c *AnalyzerContext) processCachedCloseEvents() { for len(c.flushClosedEvents) > 0 { event := <-c.flushClosedEvents if !c.socketClosedEvent0(event) { // if cannot the found the active connection, then just create a new closed connection context processes := c.processes[int32(event.Pid)] if len(processes) == 0 { continue } cc := c.NewConnectionContext(event.ConID, event.RandomID, event.Pid, event.SocketFD, processes, true) if event.SocketFamily == unix.AF_INET { cc.RemoteIP = parseAddressV4(event.RemoteAddrV4) cc.LocalIP = parseAddressV4(event.LocalAddrV4) } else if event.SocketFamily == unix.AF_INET6 { cc.RemoteIP = parseAddressV6(event.RemoteAddrV6) cc.LocalIP = parseAddressV6(event.LocalAddrV6) } else { continue } // append to the closed connection c.appendClosedConnection(c.combineClosedConnection(cc, event)) } } } func (c *AnalyzerContext) generateConnectionKey(conID, randomID uint64) string { return fmt.Sprintf("%d_%d", conID, randomID) } func (c *AnalyzerContext) socketClosedEvent0(event *events.SocketCloseEvent) bool { activeCon := c.foundAndDeleteConnection(event) if activeCon == nil { return false } // combine the connection data c.appendClosedConnection(c.combineClosedConnection(activeCon, event)) return true } func (c *AnalyzerContext) foundAndDeleteConnection(event *events.SocketCloseEvent) *ConnectionContext { conKey := c.generateConnectionKey(event.ConID, event.RandomID) val, exists := c.activeConnections.Pop(conKey) if !exists { return nil } return val.(*ConnectionContext) } func (c *AnalyzerContext) combineClosedConnection(active *ConnectionContext, closed *events.SocketCloseEvent) *ConnectionContext { active.ConnectionClosed = true if active.Role == events.ConnectionRoleUnknown && closed.Role != events.ConnectionRoleUnknown { active.Role = closed.Role } if active.Protocol == events.ConnectionProtocolUnknown && closed.Protocol != events.ConnectionProtocolUnknown { active.Protocol = closed.Protocol } if !active.IsSSL && closed.IsSSL == 1 { active.IsSSL = true } // notify listeners for _, l := range c.listeners { l.ReceiveCloseConnection(active, closed) } return active } func (c *AnalyzerContext) saveActiveConnection(con *ConnectionContext) { c.activeConnections.Set(c.generateConnectionKey(con.ConnectionID, con.RandomID), con) } func (c *AnalyzerContext) flushClosedConnection() []*ConnectionContext { c.closedConnectionLocker.Lock() defer c.closedConnectionLocker.Unlock() connections := c.closedConnections c.closedConnections = make([]*ConnectionContext, 0) return connections } func (c *AnalyzerContext) appendClosedConnection(con *ConnectionContext) { c.closedConnectionLocker.RLock() defer c.closedConnectionLocker.RUnlock() c.closedConnections = append(c.closedConnections, con) } func parseAddressV4(val uint32) string { return net.IP((*(*[net.IPv4len]byte)(unsafe.Pointer(&val)))[:]).String() } func parseAddressV6(val [16]uint8) string { return net.IP((*(*[net.IPv6len]byte)(unsafe.Pointer(&val)))[:]).String() }