plugins/core/sampler.go (108 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package core import ( "fmt" "math/rand" "strconv" "sync" "time" "github.com/apache/skywalking-go/plugins/core/reporter" ) type Sampler interface { IsSampled(operation string) (sampled bool) } type ConstSampler struct { decision bool } // NewConstSampler creates a ConstSampler. func NewConstSampler(sample bool) *ConstSampler { s := &ConstSampler{ decision: sample, } return s } // IsSampled implements IsSampled() of Sampler. func (s *ConstSampler) IsSampled(operation string) bool { return s.decision } // RandomSampler Use sync.Pool to implement concurrent-safe for randomizer. type RandomSampler struct { samplingRate float64 threshold int pool sync.Pool } // IsSampled implements IsSampled() of Sampler. func (s *RandomSampler) IsSampled(_ string) bool { return s.threshold > s.generateRandomNumber() } func (s *RandomSampler) init() { s.threshold = int(s.samplingRate * 100) s.pool.New = s.newRand } func (s *RandomSampler) generateRandomNumber() int { r := s.getRandomizer() defer s.returnRandomizer(r) return r.Intn(100) } func (s *RandomSampler) returnRandomizer(r *rand.Rand) { s.pool.Put(r) } func (s *RandomSampler) getRandomizer() *rand.Rand { var r *rand.Rand generator := s.pool.Get() if generator == nil { generator = s.newRand() } r, ok := generator.(*rand.Rand) if !ok { r = s.newRand().(*rand.Rand) // it must be *rand.Rand } return r } func (s *RandomSampler) newRand() interface{} { return rand.New(rand.NewSource(time.Now().UnixNano())) } func NewRandomSampler(samplingRate float64) *RandomSampler { s := &RandomSampler{ samplingRate: samplingRate, } s.init() return s } type DynamicSampler struct { currentRate float64 defaultRate float64 sampler Sampler } // IsSampled implements IsSampled() of Sampler. func (s *DynamicSampler) IsSampled(operation string) bool { return s.sampler.IsSampled(operation) } func (s *DynamicSampler) Key() string { return "agent.sample_rate" } func (s *DynamicSampler) Notify(eventType reporter.AgentConfigEventType, newValue string) { if eventType == reporter.DELETED { newValue = fmt.Sprintf("%f", s.defaultRate) } samplingRate, err := strconv.ParseFloat(newValue, 64) if err != nil { return } // change Sampler var sampler Sampler if samplingRate <= 0 { sampler = NewConstSampler(false) } else if samplingRate >= 1.0 { sampler = NewConstSampler(true) } else { sampler = NewRandomSampler(samplingRate) } s.sampler = sampler s.currentRate = samplingRate } func (s *DynamicSampler) Value() string { return fmt.Sprintf("%f", s.currentRate) } func NewDynamicSampler(samplingRate float64, tracer *Tracer) *DynamicSampler { s := &DynamicSampler{ currentRate: samplingRate, defaultRate: samplingRate, } s.Notify(reporter.MODIFY, fmt.Sprintf("%f", samplingRate)) // append watcher tracer.cdsWatchers = append(tracer.cdsWatchers, s) return s }