pkg/profiling/task/network/analyze/layer7/protocols/base/analyzer.go (200 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package base
import (
"context"
"fmt"
"sync"
"time"
cmap "github.com/orcaman/concurrent-map"
"github.com/apache/skywalking-rover/pkg/logger"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
)
const (
batchReadMinCount = 1000
)
var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "base")
// ProtocolAnalyzer handler all socket data for each protocol
type ProtocolAnalyzer struct {
protocolContext Context
protocol Protocol
config *profiling.TaskConfig
connections cmap.ConcurrentMap // connections with concurrent key: connection id+random id, value: *connectionInfo
analyzeLocker sync.Mutex
receiveEventCount int
}
func NewProtocolAnalyzer(protocolContext Context, p Protocol, config *profiling.TaskConfig) *ProtocolAnalyzer {
return &ProtocolAnalyzer{
protocolContext: protocolContext,
protocol: p,
config: config,
connections: cmap.New(),
}
}
func (a *ProtocolAnalyzer) Start(ctx context.Context) {
duration, _ := time.ParseDuration(a.config.Network.ReportInterval)
timeTicker := time.NewTicker(duration)
go func() {
for {
select {
case <-timeTicker.C:
// process event with interval
a.processEvents()
case <-ctx.Done():
timeTicker.Stop()
return
}
}
}()
// if the protocol defined the events expire time, then check events interval
expireDuration := a.protocol.PackageMaxExpireDuration()
if expireDuration.Milliseconds() > 0 {
expireTicker := time.NewTicker(expireDuration)
go func() {
for {
select {
case <-expireTicker.C:
a.processExpireEvents(expireDuration)
case <-ctx.Done():
expireTicker.Stop()
return
}
}
}()
}
}
func (a *ProtocolAnalyzer) ReceiveSocketDetail(ctx Context, event *events.SocketDetailEvent) {
connectionID := event.GenerateConnectionID()
connection := a.getConnection(ctx, event.ConnectionID, event.RandomID)
log.Debugf("receive detail from connection: %s, dataid: %d", connectionID, event.DataID())
connection.buffer.AppendDetailEvent(event)
}
func (a *ProtocolAnalyzer) ReceiveSocketData(ctx Context, event *events.SocketDataUploadEvent) {
connectionID := event.GenerateConnectionID()
connection := a.getConnection(ctx, event.ConnectionID, event.RandomID)
log.Debugf("receive data from connection: %s, dataid: %d, sequence: %d, finished: %d, have reduce after chunk: %t, "+
"direction: %s, size: %d, total size: %d",
connectionID, event.DataID(), event.DataSequence(), event.Finished, event.HaveReduceDataAfterChunk(),
event.Direction().String(), event.DataLen, event.TotalSize0)
// insert to the event list
connection.buffer.AppendDataEvent(event)
// process the events if reach the receiver counter
a.receiveEventCount++
if a.receiveEventCount >= batchReadMinCount {
a.processEvents()
}
a.receiveEventCount = 0
}
func (a *ProtocolAnalyzer) getConnection(ctx Context, connectionID, randomID uint64) *connectionInfo {
conKey := a.generateConnectionInfoKey(connectionID, randomID)
connection, _ := a.connections.Get(conKey)
if connection == nil {
connection = newConnectionInfo(a.protocol, ctx, connectionID, randomID)
a.connections.Set(conKey, connection)
}
info := connection.(*connectionInfo)
info.checkConnectionMetrics(ctx)
return info
}
// processEvents means analyze the protocol in each connection
func (a *ProtocolAnalyzer) processEvents() {
// it could be triggered by interval or reach counter
// if any trigger bean locked, the other one just ignore process
if !a.analyzeLocker.TryLock() {
return
}
defer a.analyzeLocker.Unlock()
closedConnections := make([]string, 0)
a.connections.IterCb(func(conKey string, con interface{}) {
info := con.(*connectionInfo)
a.processConnectionEvents(info)
// if the connection already closed and not contains any buffer data, then delete the connection
if info.closed && info.buffer.DataLength() == 0 {
closedConnections = append(closedConnections, conKey)
}
})
for _, conKey := range closedConnections {
a.connections.Remove(conKey)
}
}
// processExpireEvents delete the expired events
func (a *ProtocolAnalyzer) processExpireEvents(expireDuration time.Duration) {
// the expiry must be mutual exclusion with events processor
a.analyzeLocker.Lock()
defer a.analyzeLocker.Unlock()
a.connections.IterCb(func(_ string, con interface{}) {
a.processConnectionExpireEvents(con.(*connectionInfo), expireDuration)
})
}
func (a *ProtocolAnalyzer) processConnectionEvents(connection *connectionInfo) {
// reset the status for prepare reading
metrics := connection.metrics
connectionID := connection.connectionID
randomID := connection.randomID
connection.buffer.ResetForLoopReading()
// loop to read the protocol data
for {
// reset the status of reading
if !connection.buffer.PrepareForReading() {
log.Debugf("prepare finsihed: reduce data event size: %d", connection.buffer.DataLength())
return
}
result := a.protocol.ParseProtocol(connectionID, randomID, metrics, connection.buffer)
finishReading := false
switch result {
case enums.ParseResultSuccess:
finishReading = connection.buffer.RemoveReadElements(true)
case enums.ParseResultSkipPackage:
finishReading = connection.buffer.SkipCurrentElement()
}
if finishReading {
log.Debugf("reading finsihed: reduce data event size: %d", connection.buffer.DataLength())
break
}
}
}
func (a *ProtocolAnalyzer) processConnectionExpireEvents(connection *connectionInfo, expireDuration time.Duration) {
if c := connection.buffer.DeleteExpireEvents(expireDuration); c > 0 {
log.Debugf("total removed %d expired events for %s protocol", c, enums.ConnectionProtocolString(a.protocol.Protocol()))
}
}
func (a *ProtocolAnalyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
a.protocol.UpdateExtensionConfig(config)
}
func (a *ProtocolAnalyzer) ReceiveSocketCloseEvent(event *events.SocketCloseEvent) {
con, _ := a.connections.Get(a.generateConnectionInfoKey(event.ConID, event.RandomID))
if con == nil {
return
}
con.(*connectionInfo).closed = true
}
func (a *ProtocolAnalyzer) generateConnectionInfoKey(connectionID, randomID uint64) string {
return fmt.Sprintf("%d_%d", connectionID, randomID)
}
type connectionInfo struct {
connectionID, randomID uint64
connectionProtocol enums.ConnectionProtocol
buffer *buffer.Buffer
metrics Metrics
metricsFromConnection bool
closed bool
}
func newConnectionInfo(p Protocol, connectionContext Context, connectionID, randomID uint64) *connectionInfo {
fromConnection := false
var connectionMetrics Metrics
con := connectionContext.QueryConnection(connectionID, randomID)
// if connection not exists, then cached it into the analyzer context
if con == nil {
connectionMetrics = p.GenerateMetrics()
} else {
connectionMetrics = connectionContext.QueryProtocolMetrics(con.Metrics, p.Protocol())
fromConnection = true
}
return &connectionInfo{
connectionID: connectionID,
randomID: randomID,
connectionProtocol: p.Protocol(),
buffer: buffer.NewBuffer(),
metrics: connectionMetrics,
metricsFromConnection: fromConnection,
}
}
func (c *connectionInfo) checkConnectionMetrics(protocolContext Context) {
if c.metricsFromConnection {
return
}
connection := protocolContext.QueryConnection(c.connectionID, c.randomID)
if connection == nil {
return
}
// merge the temporary metrics into the connection metrics
connectionMetrics := protocolContext.QueryProtocolMetrics(connection.Metrics, c.connectionProtocol)
connectionMetrics.MergeMetricsFromConnection(connection, c.metrics)
c.metrics = connectionMetrics
c.metricsFromConnection = true
}