pkg/accesslog/sender/sender.go (187 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sender import ( "container/list" "context" "fmt" "sync" "time" "github.com/apache/skywalking-rover/pkg/accesslog/common" "github.com/apache/skywalking-rover/pkg/core" "github.com/apache/skywalking-rover/pkg/logger" "github.com/apache/skywalking-rover/pkg/module" "github.com/apache/skywalking-rover/pkg/process" "github.com/apache/skywalking-rover/pkg/tools/host" "github.com/sirupsen/logrus" v32 "skywalking.apache.org/repo/goapi/collect/common/v3" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) var log = logger.GetLogger("accesslog", "sender") // GRPCSender Async to sending the access log to the backend type GRPCSender struct { logs *list.List notify chan bool mutex sync.Mutex ctx context.Context mgr *module.Manager connectionMgr *common.ConnectionManager alsClient v3.EBPFAccessLogServiceClient clusterName string } // NewGRPCSender creates a new GRPCSender func NewGRPCSender(mgr *module.Manager, connectionMgr *common.ConnectionManager) *GRPCSender { return &GRPCSender{ logs: list.New(), notify: make(chan bool, 1), mgr: mgr, connectionMgr: connectionMgr, clusterName: mgr.FindModule(core.ModuleName).(core.Operator).ClusterName(), alsClient: v3.NewEBPFAccessLogServiceClient(mgr.FindModule(core.ModuleName).(core.Operator). BackendOperator().GetConnection()), } } func (g *GRPCSender) Start(ctx context.Context) { g.ctx = ctx go func() { for { select { case <-g.notify: if count, err := g.handleLogs(); err != nil { log.Warnf("sending access log error, lost %d logs, error: %v", count, err) } case <-ctx.Done(): return } } }() } func (g *GRPCSender) NewBatch() *BatchLogs { return &BatchLogs{ logs: make(map[*common.ConnectionInfo]*ConnectionLogs), } } func (g *GRPCSender) AddBatch(batch *BatchLogs) { // split logs splitLogs := batch.splitBatchLogs() // append the resend logs g.mutex.Lock() defer g.mutex.Unlock() for _, l := range splitLogs { g.logs.PushBack(l) } // notify the sender select { case g.notify <- true: default: } } func (g *GRPCSender) handleLogs() (int, error) { for { // pop logs logs := g.popLogs() if logs == nil { return 0, nil } // send logs now := time.Now() if err := g.sendLogs(logs); err != nil { return len(logs.logs), err } log.Infof("sending access log success, connection count: %d, use time: %s", logs.ConnectionCount(), time.Since(now).String()) } } func (g *GRPCSender) sendLogs(batch *BatchLogs) error { timeout, cancelFunc := context.WithTimeout(g.ctx, time.Second*20) defer cancelFunc() streaming, err := g.alsClient.Collect(timeout) if err != nil { return err } firstLog := true firstConnection := true var sendError error for connection, logs := range batch.logs { if len(logs.kernels) == 0 && len(logs.protocols) == 0 { continue } if log.Enable(logrus.DebugLevel) { log.Debugf("ready to sending access log with connection, connection ID: %d, random ID: %d, "+ "local: %s, remote: %s, role: %s, contains ztunnel address: %t, kernel logs count: %d, protocol log count: %d", connection.ConnectionID, connection.RandomID, connection.RPCConnection.Local, connection.RPCConnection.Remote, connection.RPCConnection.Role, connection.RPCConnection.Attachment != nil, len(logs.kernels), len(logs.protocols)) } if len(logs.kernels) > 0 { sendError = g.sendLogToTheStream(streaming, g.buildAccessLogMessage(firstLog, firstConnection, connection, logs.kernels, nil)) firstLog, firstConnection = false, false } for _, protocolLog := range logs.protocols { sendError = g.sendLogToTheStream(streaming, g.buildAccessLogMessage(firstLog, firstConnection, connection, protocolLog.kernels, protocolLog.protocol)) firstLog, firstConnection = false, false } if sendError != nil { g.closeStream(streaming) return fmt.Errorf("sending access log error: %v", sendError) } firstConnection = true } g.closeStream(streaming) return nil } func (g *GRPCSender) closeStream(s v3.EBPFAccessLogService_CollectClient) { if _, err := s.CloseAndRecv(); err != nil { log.Warnf("closing the access log streaming error: %v", err) } } func (g *GRPCSender) sendLogToTheStream(streaming v3.EBPFAccessLogService_CollectClient, logMsg *v3.EBPFAccessLogMessage) error { if err := streaming.Send(logMsg); err != nil { return err } return nil } func (g *GRPCSender) buildAccessLogMessage(firstLog, firstConnection bool, conn *common.ConnectionInfo, kernelLogs []*v3.AccessLogKernelLog, protocolLog *v3.AccessLogProtocolLogs) *v3.EBPFAccessLogMessage { var rpcCon *v3.AccessLogConnection if firstConnection { rpcCon = conn.RPCConnection } return &v3.EBPFAccessLogMessage{ Node: g.BuildNodeInfo(firstLog), Connection: rpcCon, KernelLogs: kernelLogs, ProtocolLog: protocolLog, } } func (g *GRPCSender) BuildNodeInfo(needs bool) *v3.EBPFAccessLogNodeInfo { if !needs { return nil } netInterfaces := make([]*v3.EBPFAccessLogNodeNetInterface, 0) for i, n := range host.AllNetworkInterfaces() { netInterfaces = append(netInterfaces, &v3.EBPFAccessLogNodeNetInterface{ Index: int32(i), Mtu: int32(n.MTU), Name: n.Name, }) } return &v3.EBPFAccessLogNodeInfo{ Name: g.mgr.FindModule(process.ModuleName).(process.K8sOperator).NodeName(), NetInterfaces: netInterfaces, BootTime: g.convertTimeToInstant(host.BootTime), ClusterName: g.clusterName, Policy: &v3.EBPFAccessLogPolicy{ ExcludeNamespaces: g.connectionMgr.GetExcludeNamespaces(), }, } } func (g *GRPCSender) convertTimeToInstant(t time.Time) *v32.Instant { return &v32.Instant{ Seconds: t.Unix(), Nanos: int32(t.Nanosecond()), } } func (g *GRPCSender) popLogs() *BatchLogs { g.mutex.Lock() defer g.mutex.Unlock() if g.logs.Len() == 0 { return nil } e := g.logs.Front() logs := e.Value.(*BatchLogs) g.logs.Remove(e) return logs }