pkg/profiling/task/network/analyze/layer7/events.go (66 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package layer7
import (
"context"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
analyzeBase "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/bpf"
"github.com/apache/skywalking-rover/pkg/tools/btf"
)
func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profiling.TaskConfig) {
l.socketDataQueue = btf.NewEventQueue("socket data resolver", parallels, queueSize, func(num int) btf.PartitionContext {
return NewSocketDataPartitionContext(l, config)
})
}
func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) {
// socket buffer data
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDataUploadEvent{Buffer: *buffer.BorrowNewBuffer()}
}, func(data interface{}) int {
return int(data.(*analyzeBase.SocketDataUploadEvent).ConnectionID)
})
// socket detail
l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDetailDataQueue, l.protocolPerCPUBuffer, 1, func() interface{} {
return &analyzeBase.SocketDetailEvent{}
}, func(data interface{}) int {
return int(data.(*analyzeBase.SocketDetailEvent).ConnectionID)
})
l.socketDataQueue.Start(ctx, bpfLoader.Linker)
}
func (l *Listener) handleProfilingExtensionConfig(config *profiling.ExtensionConfig) {
if l.socketDataQueue == nil {
return
}
for _, p := range l.socketDataQueue.PartitionContexts() {
ctx := p.(*SocketDataPartitionContext)
ctx.analyzer.UpdateExtensionConfig(config)
}
}
func (l *Listener) handleConnectionClose(event *analyzeBase.SocketCloseEvent) {
if l.socketDataQueue == nil {
return
}
for _, p := range l.socketDataQueue.PartitionContexts() {
ctx := p.(*SocketDataPartitionContext)
ctx.analyzer.ReceiveSocketClose(event)
}
}
type SocketDataPartitionContext struct {
analyzer *protocols.Analyzer
}
func NewSocketDataPartitionContext(l base.Context, config *profiling.TaskConfig) *SocketDataPartitionContext {
return &SocketDataPartitionContext{
analyzer: protocols.NewAnalyzer(l, config),
}
}
func (p *SocketDataPartitionContext) Start(ctx context.Context) {
p.analyzer.Start(ctx)
}
func (p *SocketDataPartitionContext) Consume(data interface{}) {
switch v := data.(type) {
case *analyzeBase.SocketDetailEvent:
p.analyzer.ReceiveSocketDetail(v)
case *analyzeBase.SocketDataUploadEvent:
p.analyzer.ReceiveSocketDataEvent(v)
}
}