pkg/accesslog/collector/protocols/queue.go (294 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package protocols
import (
"context"
"errors"
"fmt"
"os"
"sort"
"strconv"
"sync"
"time"
"github.com/cilium/ebpf"
"github.com/apache/skywalking-rover/pkg/accesslog/common"
"github.com/apache/skywalking-rover/pkg/accesslog/events"
"github.com/apache/skywalking-rover/pkg/accesslog/forwarder"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/btf"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/docker/go-units"
cmap "github.com/orcaman/concurrent-map"
)
var maxBufferExpireDuration = time.Minute
var log = logger.GetLogger("accesslog", "collector", "protocols")
type AnalyzeQueue struct {
context *common.AccessLogContext
eventQueue *btf.EventQueue
perCPUBuffer int64
detailSupplier func() events.SocketDetail
supportAnalyzers func(ctx *common.AccessLogContext) []Protocol
}
func NewAnalyzeQueue(ctx *common.AccessLogContext) (*AnalyzeQueue, error) {
perCPUBufferSize, err := units.RAMInBytes(ctx.Config.ProtocolAnalyze.PerCPUBufferSize)
if err != nil {
return nil, err
}
if int(perCPUBufferSize) < os.Getpagesize() {
return nil, fmt.Errorf("the cpu buffer must bigger than %dB", os.Getpagesize())
}
if ctx.Config.ProtocolAnalyze.AnalyzeParallels < 1 {
return nil, fmt.Errorf("the analyze parallels cannot be small than 1")
}
if ctx.Config.ProtocolAnalyze.ParseParallels < 1 {
return nil, fmt.Errorf("the parse parallels cannot be small than 1")
}
if ctx.Config.ProtocolAnalyze.QueueSize < 1 {
return nil, fmt.Errorf("the queue size be small than 1")
}
return &AnalyzeQueue{
context: ctx,
perCPUBuffer: perCPUBufferSize,
detailSupplier: func() events.SocketDetail {
return &events.SocketDetailEvent{}
},
supportAnalyzers: func(ctx *common.AccessLogContext) []Protocol {
return []Protocol{
NewHTTP1Analyzer(ctx, nil),
NewHTTP2Analyzer(ctx, nil),
}
},
}, nil
}
func (q *AnalyzeQueue) Start(ctx context.Context) {
q.eventQueue = btf.NewEventQueue("socket data analyzer",
q.context.Config.ProtocolAnalyze.AnalyzeParallels, q.context.Config.ProtocolAnalyze.QueueSize,
func(num int) btf.PartitionContext {
return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context))
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
return q.detailSupplier()
}, func(data interface{}) int {
return int(data.(events.SocketDetail).GetConnectionID())
})
q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, int(q.perCPUBuffer),
q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} {
return &events.SocketDataUploadEvent{Buffer: *buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return int(data.(*events.SocketDataUploadEvent).ConnectionID)
})
q.eventQueue.Start(ctx, q.context.BPF.Linker)
}
func (q *AnalyzeQueue) ChangeDetailSupplier(supplier func() events.SocketDetail) {
q.detailSupplier = supplier
}
func (q *AnalyzeQueue) ChangeSupportAnalyzers(protocols func(ctx *common.AccessLogContext) []Protocol) {
q.supportAnalyzers = protocols
}
type PartitionContext struct {
context *common.AccessLogContext
protocolMgr *ProtocolManager
connections cmap.ConcurrentMap
partitionNum int
analyzeLocker sync.Mutex
}
func newPartitionConnection(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
connection := &PartitionConnection{
connectionID: conID,
randomID: randomID,
dataBuffers: make(map[enums.ConnectionProtocol]*buffer.Buffer),
protocol: make(map[enums.ConnectionProtocol]uint64),
protocolAnalyzer: make(map[enums.ConnectionProtocol]Protocol),
protocolMetrics: make(map[enums.ConnectionProtocol]ProtocolMetrics),
lastCheckCloseTime: time.Now(),
}
connection.appendProtocolIfNeed(protocolMgr, conID, randomID, protocol, currentDataID)
return connection
}
func (p *PartitionConnection) appendProtocolIfNeed(protocolMgr *ProtocolManager, conID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) {
if minDataID, exist := p.protocol[protocol]; !exist {
analyzer := protocolMgr.GetProtocol(protocol)
p.protocol[protocol] = currentDataID
p.dataBuffers[protocol] = buffer.NewBuffer()
p.protocolAnalyzer[protocol] = analyzer
p.protocolMetrics[protocol] = analyzer.GenerateConnection(conID, randomID)
} else if currentDataID < minDataID {
p.protocol[protocol] = currentDataID
}
}
func NewPartitionContext(ctx *common.AccessLogContext, num int, protocols []Protocol) *PartitionContext {
pc := &PartitionContext{
context: ctx,
protocolMgr: NewProtocolManager(protocols),
connections: cmap.New(),
partitionNum: num,
}
ctx.ConnectionMgr.RegisterProcessor(pc)
return pc
}
func (p *PartitionContext) Start(ctx context.Context) {
// process events with interval
flushDuration, _ := time.ParseDuration(p.context.Config.Flush.Period)
timeTicker := time.NewTicker(flushDuration)
go func() {
for {
select {
case <-timeTicker.C:
// process event with interval
p.ProcessEvents()
case <-ctx.Done():
timeTicker.Stop()
return
}
}
}()
// delete the expired events
expireTicker := time.NewTicker(maxBufferExpireDuration)
go func() {
for {
select {
case <-expireTicker.C:
p.ProcessExpireEvents()
case <-ctx.Done():
expireTicker.Stop()
return
}
}
}()
}
func (p *PartitionContext) Consume(data interface{}) {
switch event := data.(type) {
case events.SocketDetail:
pid, _ := events.ParseConnectionID(event.GetConnectionID())
log.Debugf("receive the socket detail event, connection ID: %d, random ID: %d, pid: %d, data id: %d, "+
"function name: %s, package count: %d, package size: %d, ssl: %d, protocol: %d",
event.GetConnectionID(), event.GetRandomID(), pid, event.DataID(), event.GetFunctionName(),
event.GetL4PackageCount(), event.GetL4TotalPackageSize(), event.GetSSL(), event.GetProtocol())
if event.GetProtocol() == enums.ConnectionProtocolUnknown {
// if the connection protocol is unknown, we just needs to add this into the kernel log
forwarder.SendTransferNoProtocolEvent(p.context, event)
return
}
connection := p.GetConnectionContext(event.GetConnectionID(), event.GetRandomID(), event.GetProtocol(), event.DataID())
connection.AppendDetail(p.context, event)
case *events.SocketDataUploadEvent:
pid, _ := events.ParseConnectionID(event.ConnectionID)
log.Debugf("receive the socket data event, connection ID: %d, random ID: %d, pid: %d, prev data id: %d, "+
"data id: %d, sequence: %d, finished: %t, protocol: %d, size: %d",
event.ConnectionID, event.RandomID, pid, event.PrevDataID0, event.DataID0, event.Sequence0, event.IsFinished(),
event.Protocol0, event.BufferLen())
connection := p.GetConnectionContext(event.ConnectionID, event.RandomID, event.Protocol0, event.DataID0)
connection.AppendData(event)
}
}
func (p *PartitionContext) GetConnectionContext(connectionID, randomID uint64,
protocol enums.ConnectionProtocol, currentDataID uint64) *PartitionConnection {
conKey := p.buildConnectionKey(connectionID, randomID)
conn, exist := p.connections.Get(conKey)
if exist {
connection := conn.(*PartitionConnection)
connection.appendProtocolIfNeed(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
return connection
}
result := newPartitionConnection(p.protocolMgr, connectionID, randomID, protocol, currentDataID)
p.connections.Set(conKey, result)
return result
}
func (p *PartitionContext) buildConnectionKey(conID, ranID uint64) string {
buf := make([]byte, 0, 42) // 21 + 1 + 21
buf = strconv.AppendUint(buf, conID, 10)
buf = append(buf, '_')
buf = strconv.AppendUint(buf, ranID, 10)
return string(buf)
}
func (p *PartitionContext) ProcessEvents() {
// it could be triggered by interval or reach counter
// if any trigger bean locked, the other one just ignore process
if !p.analyzeLocker.TryLock() {
return
}
defer p.analyzeLocker.Unlock()
closedConnections := make([]string, 0)
p.connections.IterCb(func(conKey string, con interface{}) {
info := con.(*PartitionConnection)
p.processConnectionEvents(info)
// if the connection already closed and not contains any buffer data, then delete the connection
var bufLen = 0
for _, buf := range info.dataBuffers {
bufLen += buf.DataLength()
}
if bufLen > 0 {
return
}
if !info.closed {
p.checkTheConnectionIsAlreadyClose(info)
}
if info.closed {
closedConnections = append(closedConnections, conKey)
log.Debugf("detect the connection is already closed, then notify to the callback, connection ID: %d, random ID: %d, partition number: %d",
info.connectionID, info.randomID, p.partitionNum)
}
})
for _, conKey := range closedConnections {
p.connections.Remove(conKey)
}
}
func (p *PartitionContext) checkTheConnectionIsAlreadyClose(con *PartitionConnection) {
if time.Since(con.lastCheckCloseTime) <= time.Second*30 {
return
}
con.lastCheckCloseTime = time.Now()
var activateConn common.ActiveConnection
if err := p.context.BPF.ActiveConnectionMap.Lookup(con.connectionID, &activateConn); err != nil {
if errors.Is(err, ebpf.ErrKeyNotExist) {
con.closed = true
log.Debugf("detect the connection: %d-%d is already closed(by key not exist), so remove from the activate connection",
con.connectionID, con.randomID)
return
}
log.Warnf("cannot found the active connection: %d-%d, err: %v", con.connectionID, con.randomID, err)
return
} else if activateConn.RandomID != 0 && activateConn.RandomID != con.randomID {
log.Debugf("detect the connection: %d-%d is already closed(by difference random ID), so remove from the activate connection",
con.connectionID, con.randomID)
con.closed = true
}
}
func (p *PartitionContext) ProcessExpireEvents() {
// the expiry must be mutual exclusion with events processor
p.analyzeLocker.Lock()
defer p.analyzeLocker.Unlock()
p.connections.IterCb(func(_ string, con interface{}) {
p.processConnectionExpireEvents(con.(*PartitionConnection))
})
}
func (p *PartitionContext) processConnectionExpireEvents(connection *PartitionConnection) {
for _, buf := range connection.dataBuffers {
if c := buf.DeleteExpireEvents(maxBufferExpireDuration); c > 0 {
log.Debugf("total removed %d expired socket data events from connection ID: %d, random ID: %d", c,
connection.connectionID, connection.randomID)
}
}
}
func (p *PartitionContext) processConnectionEvents(connection *PartitionConnection) {
if connection.skipAllDataAnalyze {
return
}
helper := &AnalyzeHelper{}
// since the socket data/detail are getting unsorted, so rover need to using the minimal data id to analyze to ensure the order
sortedProtocols := make([]enums.ConnectionProtocol, 0, len(connection.protocol))
for protocol := range connection.protocol {
sortedProtocols = append(sortedProtocols, protocol)
}
sort.Slice(sortedProtocols, func(i, j int) bool {
return connection.protocol[sortedProtocols[i]] < connection.protocol[sortedProtocols[j]]
})
for _, protocol := range sortedProtocols {
if err := connection.protocolAnalyzer[protocol].Analyze(connection, helper); err != nil {
log.Warnf("failed to analyze the %s protocol data: %v", enums.ConnectionProtocolString(protocol), err)
}
}
if helper.ProtocolBreak {
// notify the connection manager to skip analyze all data(just sending the detail)
connection.skipAllDataAnalyze = true
p.context.ConnectionMgr.SkipAllDataAnalyzeAndDowngradeProtocol(connection.connectionID, connection.randomID)
for _, buf := range connection.dataBuffers {
for e := buf.BuildDetails().Front(); e != nil; e = e.Next() {
forwarder.SendTransferNoProtocolEvent(p.context, e.Value.(events.SocketDetail))
}
buf.Clean()
}
}
}