pkg/profiling/task/network/analyze/layer7/protocols/http1/sampling.go (156 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package http1 import ( "fmt" "regexp" "strings" "time" lru "github.com/hashicorp/golang-lru" "github.com/apache/skywalking-rover/pkg/process/api" profiling "github.com/apache/skywalking-rover/pkg/profiling/task/base" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/base" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader" "github.com/apache/skywalking-rover/pkg/profiling/task/network/analyze/layer7/protocols/metrics" protocol "github.com/apache/skywalking-rover/pkg/tools/tracing" ) const ( TopNSize = 10 SamplingRuleCacheSize = 200 ) type Sampler struct { Error4xxTraces *metrics.TopN Error5xxTraces *metrics.TopN SlowTraces *metrics.TopN } func NewSampler() *Sampler { return &Sampler{ Error4xxTraces: metrics.NewTopN(TopNSize), Error5xxTraces: metrics.NewTopN(TopNSize), SlowTraces: metrics.NewTopN(TopNSize), } } func (s *Sampler) AppendMetrics(config *SamplingConfig, duration time.Duration, request *reader.Request, response *reader.Response) { if config == nil { return } tracingContext, err := protocol.AnalyzeTracingContext(func(key string) string { return request.Headers().Get(key) }) if err != nil { log.Warnf("analyze tracing context error: %v", err) return } if tracingContext == nil { return } uri := request.Original().RequestURI // remove the query parameters if i := strings.Index(uri, "?"); i > 0 { uri = uri[0:i] } // find out with url rule is match rule := config.findMatchesRule(uri) if rule == nil { return } statusCode := response.Original().StatusCode var traceType string var topN *metrics.TopN if rule.When5XX && statusCode >= 500 && statusCode < 600 { traceType = "status_5xx" topN = s.Error5xxTraces } else if rule.When4XX && statusCode >= 400 && statusCode < 500 { traceType = "status_4xx" topN = s.Error4xxTraces } else if rule.MinDuration != nil && int64(*rule.MinDuration) <= duration.Milliseconds() { traceType = "slow" topN = s.SlowTraces } else { return } trace := &Trace{ Trace: tracingContext, RequestURI: uri, Request: request, Response: response, Type: traceType, Settings: rule.Settings, TaskConfig: config.ProfilingSampling, } topN.AddRecord(trace, duration.Milliseconds()) } func (s *Sampler) BuildMetrics(process api.ProcessInterface, traffic *base.ProcessTraffic, metricsBuilder *base.MetricsBuilder) int { var count int count += s.SlowTraces.AppendData(process, traffic, metricsBuilder) count += s.Error4xxTraces.AppendData(process, traffic, metricsBuilder) count += s.Error5xxTraces.AppendData(process, traffic, metricsBuilder) return count } func (s *Sampler) MergeAndClean(other *Sampler) { s.SlowTraces.MergeAndClean(other.SlowTraces) s.Error4xxTraces.MergeAndClean(other.Error4xxTraces) s.Error5xxTraces.MergeAndClean(other.Error5xxTraces) } func (s *Sampler) String() string { return fmt.Sprintf("slow trace count: %d, 4xx error count: %d, 5xx error count: %d", s.SlowTraces.List.Len(), s.Error4xxTraces.List.Len(), s.Error5xxTraces.List.Len()) } type SamplingConfig struct { ProfilingSampling *profiling.HTTPSamplingConfig DefaultRule *profiling.NetworkSamplingRule URISamplings []*URISampling uriRuleCache *lru.Cache } type URISampling struct { URIMatcher *regexp.Regexp Rule *profiling.NetworkSamplingRule } func NewSamplingConfig(config *profiling.TaskConfig) *SamplingConfig { cache, err := lru.New(SamplingRuleCacheSize) if err != nil { log.Warnf("creating sampling cache config failure: %v", err) } return &SamplingConfig{ ProfilingSampling: &config.Network.ProtocolAnalyze.Sampling.HTTP, uriRuleCache: cache, } } func (s *SamplingConfig) UpdateRules(configs []*profiling.NetworkSamplingRule) { if len(configs) == 0 { return } for _, c := range configs { if c.URIRegex == nil { if s.DefaultRule != nil { log.Warnf("the default rule is already exists, so ignore it") continue } s.DefaultRule = c continue } uriPattern, err := regexp.Compile(*c.URIRegex) if err != nil { log.Warnf("parsing URI pattern failure, ignore this sampling config: %v", err) continue } s.URISamplings = append(s.URISamplings, &URISampling{ URIMatcher: uriPattern, Rule: c, }) } } func (s *SamplingConfig) findMatchesRule(uri string) *profiling.NetworkSamplingRule { // if cached then return if len(s.URISamplings) == 0 { return s.DefaultRule } value, ok := s.uriRuleCache.Get(uri) if ok { return value.(*profiling.NetworkSamplingRule) } result := s.DefaultRule for _, rule := range s.URISamplings { if !rule.URIMatcher.MatchString(uri) { continue } result = rule.Rule s.uriRuleCache.Add(uri, result) } return result }