pkg/profiling/task/network/analyze/layer7/protocols/http1/metrics.go (308 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package http1
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/apache/skywalking-rover/pkg/process/api"
profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/events"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader"
"github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics"
"github.com/apache/skywalking-rover/pkg/tools"
"github.com/apache/skywalking-rover/pkg/tools/buffer"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
protocol "github.com/apache/skywalking-rover/pkg/tools/tracing"
"github.com/docker/go-units"
commonv3 "skywalking.apache.org/repo/goapi/collect/common/v3"
v3 "skywalking.apache.org/repo/goapi/collect/language/agent/v3"
logv3 "skywalking.apache.org/repo/goapi/collect/logging/v3"
)
var (
transportRequest = "Request"
transportResponse = "Response"
)
type URIMetrics struct {
RequestCounter *metrics.Counter
StatusCounter map[int]*metrics.Counter
AvgRequestPackageSize *metrics.AvgCounter
AvgResponsePackageSize *metrics.AvgCounter
ReqPackageSizeHistogram *metrics.Histogram
RespPackageSizeHistogram *metrics.Histogram
avgDuration *metrics.AvgCounter
durationHistogram *metrics.Histogram
sampler *Sampler
}
func NewHTTP1URIMetrics() *URIMetrics {
return &URIMetrics{
RequestCounter: metrics.NewCounter(),
StatusCounter: make(map[int]*metrics.Counter),
AvgRequestPackageSize: metrics.NewAvgCounter(),
AvgResponsePackageSize: metrics.NewAvgCounter(),
ReqPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
RespPackageSizeHistogram: metrics.NewHistogram(PackageSizeHistogramBuckets),
avgDuration: metrics.NewAvgCounter(),
durationHistogram: metrics.NewHistogram(DurationHistogramBuckets),
sampler: NewSampler(),
}
}
func (u *URIMetrics) Append(sampleConfig *SamplingConfig, req *reader.Request, resp *reader.Response) {
u.RequestCounter.Increase()
statusCode := resp.Original().StatusCode
statusCounter := u.StatusCounter[statusCode]
if statusCounter == nil {
statusCounter = metrics.NewCounter()
u.StatusCounter[statusCode] = statusCounter
}
statusCounter.Increase()
requestTotalSize := req.ContentTotalSize()
responseTotalSize := resp.ContentTotalSize()
u.AvgRequestPackageSize.Increase(float64(requestTotalSize))
u.AvgResponsePackageSize.Increase(float64(responseTotalSize))
u.ReqPackageSizeHistogram.Increase(float64(requestTotalSize))
u.RespPackageSizeHistogram.Increase(float64(responseTotalSize))
duration := time.Duration(resp.EndTime() - req.StartTime())
durationInMS := float64(duration.Milliseconds())
u.avgDuration.Increase(durationInMS)
u.durationHistogram.Increase(durationInMS)
u.sampler.AppendMetrics(sampleConfig, duration, req, resp)
}
func (u *URIMetrics) appendMetrics(traffic *base.ProcessTraffic,
local api.ProcessInterface, url string, metricsBuilder *base.MetricsBuilder, durationOnly bool) int {
collections := make([]*v3.MeterData, 0)
role, labels := metricsBuilder.BuildBasicMeterLabels(traffic, local)
prefix := metricsBuilder.MetricPrefix()
collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_avg", role.String()), labels, url,
traffic, u.avgDuration)
collections = u.buildMetrics(collections, prefix, fmt.Sprintf("%s_duration_histogram", role.String()), labels, url,
traffic, u.durationHistogram)
if durationOnly {
return len(collections)
}
collections = u.buildMetrics(collections, prefix, "request_counter", labels, url, traffic, u.RequestCounter)
for status, counter := range u.StatusCounter {
statusLabels := make([]*v3.Label, len(labels))
copy(statusLabels, labels)
statusLabels = append(statusLabels, &v3.Label{Name: "code", Value: fmt.Sprintf("%d", status)})
collections = u.buildMetrics(collections, prefix, "response_status_counter", statusLabels, url, traffic, counter)
}
collections = u.buildMetrics(collections, prefix, "request_package_size_avg", labels, url, traffic, u.AvgRequestPackageSize)
collections = u.buildMetrics(collections, prefix, "response_package_size_avg", labels, url, traffic, u.AvgResponsePackageSize)
collections = u.buildMetrics(collections, prefix, "request_package_size_histogram", labels, url, traffic, u.ReqPackageSizeHistogram)
collections = u.buildMetrics(collections, prefix, "response_package_size_histogram", labels, url, traffic, u.RespPackageSizeHistogram)
metricsBuilder.AppendMetrics(local.Entity().ServiceName, local.Entity().InstanceName, collections)
logsCount := u.sampler.BuildMetrics(local, traffic, metricsBuilder)
return len(collections) + logsCount
}
func (u *URIMetrics) buildMetrics(collection []*v3.MeterData, prefix, name string, basicLabels []*v3.Label,
url string, _ *base.ProcessTraffic, data metrics.Metrics) []*v3.MeterData {
// if remote process is also profiling, then needs to be calculated half of metrics
labels := basicLabels
var meterName string
if url != "" {
labels = append(labels, &v3.Label{Name: "url", Value: url})
meterName = fmt.Sprintf("%shttp1_%s_%s", prefix, "url", name)
} else {
meterName = fmt.Sprintf("%shttp1_%s", prefix, name)
}
return data.AppendMeter(collection, meterName, labels)
}
func (u *URIMetrics) MergeAndClean(other *URIMetrics) {
u.RequestCounter.MergeAndClean(other.RequestCounter)
for k, v := range other.StatusCounter {
cur := u.StatusCounter[k]
if cur == nil {
cur = metrics.NewCounter()
u.StatusCounter[k] = cur
}
cur.MergeAndClean(v)
}
u.AvgRequestPackageSize.MergeAndClean(other.AvgRequestPackageSize)
u.AvgResponsePackageSize.MergeAndClean(other.AvgResponsePackageSize)
u.ReqPackageSizeHistogram.MergeAndClean(other.ReqPackageSizeHistogram)
u.RespPackageSizeHistogram.MergeAndClean(other.RespPackageSizeHistogram)
u.avgDuration.MergeAndClean(other.avgDuration)
u.durationHistogram.MergeAndClean(other.durationHistogram)
u.sampler.MergeAndClean(other.sampler)
}
func (u *URIMetrics) String() string {
return fmt.Sprintf("request count: %d, avg request size: %f, avg response size: %f, avg duration: %f, response counters: %v, sampler: %s",
u.RequestCounter.Get(), u.AvgRequestPackageSize.Calculate(), u.AvgResponsePackageSize.Calculate(),
u.avgDuration.Calculate(), u.StatusCounter, u.sampler.String())
}
type Trace struct {
Trace protocol.Context
RequestURI string
Request *reader.Request
Response *reader.Response
Type string
Settings *profiling.NetworkDataCollectingSettings
TaskConfig *profiling.HTTPSamplingConfig
}
func (h *Trace) Flush(duration int64, process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
logData := &logv3.LogData{}
logData.Service = process.Entity().ServiceName
logData.ServiceInstance = process.Entity().InstanceName
logData.Layer = process.Entity().Layer
logData.Tags = &logv3.LogTags{Data: make([]*commonv3.KeyStringValuePair, 0)}
logData.Tags.Data = append(logData.Tags.Data, &commonv3.KeyStringValuePair{Key: "LOG_KIND", Value: "NET_PROFILING_SAMPLED_TRACE"})
// trace common
traceContext := &logv3.TraceContext{}
traceContext.TraceId = h.Trace.TraceID()
logData.TraceContext = traceContext
// body
logBody := &logv3.LogDataBody{Type: "json"}
body := &SamplingTraceLogBody{
Latency: duration,
TraceProvider: h.Trace.Provider().Name,
DetectPoint: traffic.Role.String(),
Component: enums.ConnectionProtocolString(traffic.Protocol),
SSL: traffic.IsSSL,
URI: h.RequestURI,
Reason: h.Type,
Status: h.Response.Original().StatusCode,
}
if traffic.Role == enums.ConnectionRoleClient {
body.ClientProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
body.ServerProcess = NewHTTP1SampledTraceLogRemoteProcess(traffic, process)
} else {
body.ServerProcess = &SamplingTraceLogProcess{ProcessID: process.ID()}
body.ClientProcess = NewHTTP1SampledTraceLogRemoteProcess(traffic, process)
}
bodyJSON, err := json.Marshal(body)
if err != nil {
log.Warnf("format the slow trace log body failure: %v", err)
return
}
logBody.Content = &logv3.LogDataBody_Json{Json: &logv3.JSONLog{Json: string(bodyJSON)}}
logData.Body = logBody
metricsBuilder.AppendLogs(process.Entity().ServiceName, logData)
// append full http content and syscall
h.AppendHTTPEvents(process, traffic, metricsBuilder)
}
func (h *Trace) AppendHTTPEvents(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) {
attaches := make([]*v3.SpanAttachedEvent, 0)
if h.Settings != nil && h.Settings.RequireCompleteRequest {
attaches = h.appendHTTPEvent(attaches, process, traffic, transportRequest, h.Request.MessageOpt, h.TaskConfig.DefaultRequestEncoding,
h.Settings.MaxRequestSize)
attaches = h.appendSyscallEvents(attaches, process, traffic, h.Request.MessageOpt)
}
if h.Settings != nil && h.Settings.RequireCompleteResponse {
attaches = h.appendHTTPEvent(attaches, process, traffic, transportResponse, h.Response.MessageOpt, h.TaskConfig.DefaultResponseEncoding,
h.Settings.MaxResponseSize)
attaches = h.appendSyscallEvents(attaches, process, traffic, h.Response.MessageOpt)
}
metricsBuilder.AppendSpanAttachedEvents(attaches)
}
func (h *Trace) appendHTTPEvent(attaches []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
tp string, message *reader.MessageOpt, defaultBodyEncoding string, maxSize int32) []*v3.SpanAttachedEvent {
content, err := message.TransformReadableContent(defaultBodyEncoding, int(maxSize))
if err != nil {
log.Warnf("transform http %s erorr: %v", tp, err)
return attaches
}
event := &v3.SpanAttachedEvent{}
event.StartTime = host.TimeToInstant(message.StartTime())
event.EndTime = host.TimeToInstant(message.EndTime())
event.Event = fmt.Sprintf("HTTP %s Sampling", tp)
event.Tags = make([]*commonv3.KeyStringValuePair, 0)
event.Tags = append(event.Tags,
// content data
&commonv3.KeyStringValuePair{Key: "data_size", Value: units.BytesSize(float64(message.ContentTotalSize()))},
&commonv3.KeyStringValuePair{Key: "data_content", Value: content},
&commonv3.KeyStringValuePair{Key: "data_direction", Value: message.Direction().String()},
&commonv3.KeyStringValuePair{Key: "data_type", Value: strings.ToLower(tp)},
// connection
&commonv3.KeyStringValuePair{Key: "connection_role", Value: traffic.Role.String()},
// entity
&commonv3.KeyStringValuePair{Key: "service_name", Value: process.Entity().ServiceName},
&commonv3.KeyStringValuePair{Key: "service_instance_name", Value: process.Entity().InstanceName},
&commonv3.KeyStringValuePair{Key: "process_name", Value: process.Entity().ProcessName},
)
event.Summary = make([]*commonv3.KeyIntValuePair, 0)
event.TraceContext = &v3.SpanAttachedEvent_SpanReference{
TraceId: h.Trace.TraceID(),
TraceSegmentId: h.Trace.TraceSegmentID(),
SpanId: h.Trace.SpanID(),
Type: h.Trace.Provider().SpanAttachType,
}
return append(attaches, event)
}
func (h *Trace) appendSyscallEvents(attachEvents []*v3.SpanAttachedEvent, process api.ProcessInterface, traffic *base.ProcessTraffic,
message *reader.MessageOpt) []*v3.SpanAttachedEvent {
headerDetails := message.HeaderBuffer().BuildDetails()
bodyDetails := message.BodyBuffer().BuildDetails()
dataIDCache := make(map[uint64]bool)
for e := headerDetails.Front(); e != nil; e = e.Next() {
event := e.Value.(*events.SocketDetailEvent)
dataIDCache[event.DataID()] = true
attachEvents = h.appendPerDetailEvent(attachEvents, process, traffic, event, message.HeaderBuffer())
}
for e := bodyDetails.Front(); e != nil; e = e.Next() {
event := e.Value.(*events.SocketDetailEvent)
if dataIDCache[event.DataID()] {
continue
}
attachEvents = h.appendPerDetailEvent(attachEvents, process, traffic, event, message.BodyBuffer())
}
return attachEvents
}
func (h *Trace) appendPerDetailEvent(attaches []*v3.SpanAttachedEvent, process api.ProcessInterface, _ *base.ProcessTraffic,
detail *events.SocketDetailEvent, buf *buffer.Buffer) []*v3.SpanAttachedEvent {
event := &v3.SpanAttachedEvent{}
dataBuffer := buf.FindFirstDataBuffer(detail.DataID())
if dataBuffer == nil {
return attaches
}
event.StartTime = host.TimeToInstant(dataBuffer.StartTime())
event.EndTime = host.TimeToInstant(dataBuffer.EndTime())
event.Event = fmt.Sprintf("Syscall %s", detail.FuncName.String())
event.Tags = make([]*commonv3.KeyStringValuePair, 0)
event.Tags = append(event.Tags,
// content data
&commonv3.KeyStringValuePair{Key: "package_size", Value: units.BytesSize(float64(detail.TotalPackageSize))},
&commonv3.KeyStringValuePair{Key: "package_count", Value: fmt.Sprintf("%d", detail.PackageCount)},
&commonv3.KeyStringValuePair{Key: "network_name", Value: host.NetworkName(int(detail.IfIndex))},
&commonv3.KeyStringValuePair{Key: "network_index", Value: fmt.Sprintf("%d", detail.IfIndex)},
// entity
&commonv3.KeyStringValuePair{Key: "service_name", Value: process.Entity().ServiceName},
&commonv3.KeyStringValuePair{Key: "service_instance_name", Value: process.Entity().InstanceName},
&commonv3.KeyStringValuePair{Key: "process_name", Value: process.Entity().ProcessName},
)
if detail.RTTTime > 0 {
event.Tags = append(event.Tags,
&commonv3.KeyStringValuePair{Key: "avg_rtt_time", Value: fmt.Sprintf("%dns", int(detail.RTTTime)/int(detail.RTTCount))})
}
event.Summary = make([]*commonv3.KeyIntValuePair, 0)
event.TraceContext = &v3.SpanAttachedEvent_SpanReference{
TraceId: h.Trace.TraceID(),
TraceSegmentId: h.Trace.TraceSegmentID(),
SpanId: h.Trace.SpanID(),
Type: h.Trace.Provider().SpanAttachType,
}
return append(attaches, event)
}
type SamplingTraceLogBody struct {
URI string `json:"uri"`
Reason string `json:"reason"`
Latency int64 `json:"latency"`
TraceProvider string `json:"trace_provider"`
ClientProcess *SamplingTraceLogProcess `json:"client_process"`
ServerProcess *SamplingTraceLogProcess `json:"server_process"`
DetectPoint string `json:"detect_point"`
Component string `json:"component"`
SSL bool `json:"ssl"`
Status int `json:"status"`
}
type SamplingTraceLogProcess struct {
ProcessID string `json:"process_id"`
Local bool `json:"local"`
Address string `json:"address"`
}
func NewHTTP1SampledTraceLogRemoteProcess(traffic *base.ProcessTraffic, local api.ProcessInterface) *SamplingTraceLogProcess {
if len(traffic.RemoteProcesses) != 0 {
for _, p := range traffic.RemoteProcesses {
// only match with same service instance
if local.Entity().ServiceName == p.Entity().ServiceName &&
local.Entity().InstanceName == p.Entity().InstanceName {
return &SamplingTraceLogProcess{ProcessID: p.ID()}
}
}
}
if tools.IsLocalHostAddress(traffic.RemoteIP) || traffic.Analyzer.IsLocalAddressInCache(traffic.RemoteIP) {
return &SamplingTraceLogProcess{Local: true}
}
return &SamplingTraceLogProcess{Address: fmt.Sprintf("%s:%d", traffic.RemoteIP, traffic.RemotePort)}
}