pkg/profiling/task/network/analyze/layer7/protocols/protocols.go (96 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package protocols
import (
"github.com/apache/skywalking-rover/pkg/logger"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"golang.org/x/net/context"
)
var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols")
var registerProtocols []func() protocol.Protocol
var defaultInstances []protocol.Protocol
func init() {
// register all protocol analyzers
registerProtocols = make([]func() protocol.Protocol, 0)
registerProtocols = append(registerProtocols, http1.NewHTTP1Analyzer)
defaultInstances = make([]protocol.Protocol, 0)
for _, p := range registerProtocols {
defaultInstances = append(defaultInstances, p())
}
}
type Analyzer struct {
ctx protocol.Context
protocols map[enums.ConnectionProtocol]*protocol.ProtocolAnalyzer
}
func NewAnalyzer(ctx protocol.Context, config *profiling.TaskConfig) *Analyzer {
protocols := make(map[enums.ConnectionProtocol]*protocol.ProtocolAnalyzer)
for _, r := range registerProtocols {
p := r()
p.Init(config)
analyzer := protocol.NewProtocolAnalyzer(ctx, p, config)
protocols[p.Protocol()] = analyzer
}
return &Analyzer{
ctx: ctx,
protocols: protocols,
}
}
func (a *Analyzer) Start(ctx context.Context) {
for _, p := range a.protocols {
p.Start(ctx)
}
}
func (a *Analyzer) ReceiveSocketDataEvent(event *events.SocketDataUploadEvent) {
analyzer := a.protocols[event.Protocol()]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket data, connection id: %s, protocol: %s(%d)",
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol()), event.Protocol())
return
}
analyzer.ReceiveSocketData(a.ctx, event)
}
func (a *Analyzer) ReceiveSocketDetail(event *events.SocketDetailEvent) {
analyzer := a.protocols[event.Protocol]
if analyzer == nil {
log.Warnf("could not found any protocol to handle socket detail, connection id: %s, protocol: %s(%d)",
event.GenerateConnectionID(), enums.ConnectionProtocolString(event.Protocol), event.Protocol)
return
}
analyzer.ReceiveSocketDetail(a.ctx, event)
}
func (a *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) {
for _, p := range a.protocols {
p.UpdateExtensionConfig(config)
}
}
func (a *Analyzer) ReceiveSocketClose(event *events.SocketCloseEvent) {
for _, p := range a.protocols {
p.ReceiveSocketCloseEvent(event)
}
}
type ProtocolMetrics struct {
data map[enums.ConnectionProtocol]protocol.Metrics
}
func NewProtocolMetrics() *ProtocolMetrics {
metrics := make(map[enums.ConnectionProtocol]protocol.Metrics)
for _, p := range defaultInstances {
metrics[p.Protocol()] = p.GenerateMetrics()
}
return &ProtocolMetrics{data: metrics}
}
func (m *ProtocolMetrics) GetProtocolMetrics(p enums.ConnectionProtocol) protocol.Metrics {
return m.data[p]
}
func (m *ProtocolMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) {
otherMetrics := data.(*ProtocolMetrics)
for p, d := range m.data {
d.MergeMetricsFromConnection(connection, otherMetrics.GetProtocolMetrics(p))
}
}
func (m *ProtocolMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
for _, d := range m.data {
d.FlushMetrics(traffic, metricsBuilder)
}
}