pkg/profiling/task/network/analyze/layer7/protocols/http1/analyzer.go (193 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package http1 import ( "container/list" "encoding/json" "sync" "time" "github.com/apache/skywalking-rover/pkg/logger" profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base" protocol "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/base" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader" "github.com/apache/skywalking-rover/pkg/tools/buffer" "github.com/apache/skywalking-rover/pkg/tools/enums" "github.com/sirupsen/logrus" ) var log = logger.GetLogger("profiling", "task", "network", "layer7", "protocols", "http1") var PackageSizeHistogramBuckets = []float64{ // 0.25KB, 0.5KB, 1KB, 1.5KB, 2KB, 3KB, 5KB, 8KB, 10KB, 15KB, 20KB, 35KB, 50KB, 75KB, 100KB, 200KB, 500KB 256, 512, 1048, 1536, 2048, 3072, 5120, 8192, 10240, 15360, 20480, 35840, 51200, 76800, 102400, 204800, 512000, // 800KB, 1M, 1.5M, 2M, 5M, 10M, 20M, 50M 819200, 1048576, 1572864, 2097152, 5242880, 10485760, 20971520, 52428800, } var DurationHistogramBuckets = []float64{ // unit ms 1, 2, 5, 10, 15, 20, 25, 30, 40, 45, 50, 60, 65, 70, 80, 90, 100, 110, 130, 150, 170, 200, 230, 260, 290, 330, 380, 430, 480, 500, 600, 700, 800, 900, 1000, 1100, 1300, 1500, 1800, 2000, 5000, 10000, 15000, 20000, 30000, } type Analyzer struct { // cache connection metrics if the connect event not receive or process cache map[string]*ConnectionMetrics reader *reader.Reader sampleConfig *SamplingConfig } type ConnectionMetrics struct { // halfData all data event(request/response) not finished halfData *list.List clientMetrics *URIMetrics serverMetrics *URIMetrics metricsLocker sync.RWMutex } func NewHTTP1Analyzer() protocol.Protocol { return &Analyzer{ cache: make(map[string]*ConnectionMetrics), reader: reader.NewReader(), } } func (h *Analyzer) Protocol() enums.ConnectionProtocol { return enums.ConnectionProtocolHTTP } func (h *Analyzer) GenerateMetrics() protocol.Metrics { return &ConnectionMetrics{ halfData: list.New(), clientMetrics: NewHTTP1URIMetrics(), serverMetrics: NewHTTP1URIMetrics(), } } func (h *Analyzer) Init(config *profiling.TaskConfig) { h.sampleConfig = NewSamplingConfig(config) } func (h *Analyzer) ParseProtocol(connectionID, randomID uint64, metrics protocol.Metrics, buf *buffer.Buffer) enums.ParseResult { connectionMetrics := metrics.(*ConnectionMetrics) messageType, err := h.reader.IdentityMessageType(buf) if err != nil { return enums.ParseResultSkipPackage } var result enums.ParseResult switch messageType { case reader.MessageTypeRequest: result, err = h.handleRequest(connectionMetrics, buf) case reader.MessageTypeResponse: result, err = h.handleResponse(connectionID, randomID, connectionMetrics, buf) case reader.MessageTypeUnknown: return enums.ParseResultSkipPackage } log.Debugf("readed message, messageType: %v, buf: %p, data id: %d, "+ "connection ID: %d, random ID: %d, metrics : %p, handle result: %d", messageType, buf, buf.Position().DataID(), connectionID, randomID, metrics, result) if err != nil { log.Warnf("reading %v error: %v", messageType, err) return enums.ParseResultSkipPackage } else if result != enums.ParseResultSuccess { return result } return enums.ParseResultSuccess } func (h *Analyzer) handleRequest(metrics *ConnectionMetrics, buf *buffer.Buffer) (enums.ParseResult, error) { // parsing request req, r, err := h.reader.ReadRequest(buf, true) if err != nil { return enums.ParseResultSkipPackage, err } if r != enums.ParseResultSuccess { return r, nil } metrics.AppendRequestToList(req) return enums.ParseResultSuccess, nil } func (h *Analyzer) handleResponse(connectionID, randomID uint64, metrics *ConnectionMetrics, buf *buffer.Buffer) (enums.ParseResult, error) { // find the first request firstElement := metrics.halfData.Front() if firstElement == nil { log.Debugf("cannot found request for response, skip response, connection ID: %d, random ID: %d, "+ "current data id: %d", connectionID, randomID, buf.Position().DataID()) return enums.ParseResultSkipPackage, nil } request := metrics.halfData.Remove(firstElement).(*reader.Request) // parsing request response, r, err := h.reader.ReadResponse(request, buf, true) if err != nil { return enums.ParseResultSkipPackage, err } else if r != enums.ParseResultSuccess { return r, nil } // lock append metrics with read locker metrics.metricsLocker.RLock() defer metrics.metricsLocker.RUnlock() // append metrics data := metrics.clientMetrics side := enums.ConnectionRoleClient if request.Direction() == enums.SocketDataDirectionIngress { // if receive the request, that's mean is server side data = metrics.serverMetrics side = enums.ConnectionRoleServer } data.Append(h.sampleConfig, request, response) if log.Enable(logrus.DebugLevel) { metricsJSON, _ := json.Marshal(data) log.Debugf("generated metrics, connection id: %d, random id: %d, side: %s, metrisc: %s, metrics pointer: %p", connectionID, randomID, side.String(), string(metricsJSON), metrics) } return enums.ParseResultSuccess, nil } func (h *Analyzer) UpdateExtensionConfig(config *profiling.ExtensionConfig) { if config == nil { return } h.sampleConfig.UpdateRules(config.NetworkSamplings) } func (h *Analyzer) PackageMaxExpireDuration() time.Duration { return time.Minute } func (m *ConnectionMetrics) AppendRequestToList(req *reader.Request) { if m.halfData.Len() == 0 { m.halfData.PushFront(req) return } if m.halfData.Back().Value.(*reader.Request).MinDataID() < req.MinDataID() { m.halfData.PushBack(req) return } beenAdded := false for element := m.halfData.Front(); element != nil; element = element.Next() { existEvent := element.Value.(*reader.Request) if existEvent.MinDataID() > req.MinDataID() { m.halfData.InsertBefore(req, element) beenAdded = true break } } if !beenAdded { m.halfData.PushBack(req) } } func (m *ConnectionMetrics) MergeMetricsFromConnection(connection *base.ConnectionContext, data base.ConnectionMetrics) { other := data.(*ConnectionMetrics) other.metricsLocker.Lock() defer other.metricsLocker.Unlock() if other.halfData != nil { for element := other.halfData.Front(); element != nil; element = element.Next() { m.AppendRequestToList(element.Value.(*reader.Request)) } } m.clientMetrics.MergeAndClean(other.clientMetrics) m.serverMetrics.MergeAndClean(other.serverMetrics) if log.Enable(logrus.DebugLevel) { clientMetrics, _ := json.Marshal(m.clientMetrics) serverMetrics, _ := json.Marshal(m.serverMetrics) log.Debugf("combine metrics: conid: %d_%d, porinters: %p-%p, client side metrics: %s, server side metrics: %s", connection.ConnectionID, connection.RandomID, m, other, clientMetrics, serverMetrics) } } func (m *ConnectionMetrics) FlushMetrics(traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) { for _, p := range traffic.LocalProcesses { // if the remote process is profiling, then used the client side localMetrics := m.clientMetrics remoteMetrics := m.serverMetrics if traffic.Role == enums.ConnectionRoleServer { localMetrics = m.serverMetrics remoteMetrics = m.clientMetrics } metricsCount := localMetrics.appendMetrics(traffic, p, "", metricsBuilder, false) if traffic.RemoteProcessIsProfiling() { metricsCount += remoteMetrics.appendMetrics(traffic, p, "", metricsBuilder, true) } if metricsCount <= 0 { continue } if log.Enable(logrus.DebugLevel) { // if remote process is profiling, then the metrics data need to be cut half log.Debugf("flush HTTP1 metrics(%s): %s, remote process is profiling: %t, client(%s), server(%s)", traffic.Role.String(), traffic.GenerateConnectionInfo(), traffic.RemoteProcessIsProfiling(), m.clientMetrics.String(), m.serverMetrics.String()) } } }