pkg/accesslog/sender/logs.go (73 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package sender import ( "github.com/apache/skywalking-rover/pkg/accesslog/common" v3 "skywalking.apache.org/repo/goapi/collect/ebpf/accesslog/v3" ) var maxLogsPerSend = 10_000 type BatchLogs struct { logs map[*common.ConnectionInfo]*ConnectionLogs } func newBatchLogs() *BatchLogs { return &BatchLogs{ logs: make(map[*common.ConnectionInfo]*ConnectionLogs), } } func (l *BatchLogs) ConnectionCount() int { return len(l.logs) } func (l *BatchLogs) AppendKernelLog(connection *common.ConnectionInfo, log *v3.AccessLogKernelLog) { logs, ok := l.logs[connection] if !ok { logs = newConnectionLogs() l.logs[connection] = logs } logs.kernels = append(logs.kernels, log) } func (l *BatchLogs) AppendProtocolLog(connection *common.ConnectionInfo, kernels []*v3.AccessLogKernelLog, protocols *v3.AccessLogProtocolLogs) { logs, ok := l.logs[connection] if !ok { logs = newConnectionLogs() l.logs[connection] = logs } logs.protocols = append(logs.protocols, &ConnectionProtocolLog{ kernels: kernels, protocol: protocols, }) } func (l *BatchLogs) splitBatchLogs() []*BatchLogs { logsCount := len(l.logs) if logsCount == 0 { return nil } splitCount := logsCount / maxLogsPerSend if logsCount%maxLogsPerSend != 0 { splitCount++ } result := make([]*BatchLogs, 0, splitCount) // split the connections by maxLogsPerSend currentCount := 0 var currentBatch *BatchLogs for connection, logs := range l.logs { if currentCount%maxLogsPerSend == 0 { currentBatch = newBatchLogs() result = append(result, currentBatch) currentCount = 0 } currentBatch.logs[connection] = logs currentCount++ } return result } type ConnectionLogs struct { kernels []*v3.AccessLogKernelLog protocols []*ConnectionProtocolLog } type ConnectionProtocolLog struct { kernels []*v3.AccessLogKernelLog protocol *v3.AccessLogProtocolLogs } func newConnectionLogs() *ConnectionLogs { return &ConnectionLogs{ kernels: make([]*v3.AccessLogKernelLog, 0), protocols: make([]*ConnectionProtocolLog, 0), } }