core/stat/base/sliding_window_metric.go (231 lines of code) (raw):

// Copyright 1999-2020 Alibaba Group Holding Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package base import ( "reflect" "sync/atomic" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" "github.com/pkg/errors" ) // SlidingWindowMetric represents the sliding window metric wrapper. // It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket. // // SlidingWindowMetric is designed as a high-level, read-only statistic structure for functionalities of Sentinel type SlidingWindowMetric struct { bucketLengthInMs uint32 sampleCount uint32 intervalInMs uint32 real *BucketLeapArray } // NewSlidingWindowMetric creates a SlidingWindowMetric with given attributes. // The pointer to the internal statistic BucketLeapArray should be valid. func NewSlidingWindowMetric(sampleCount, intervalInMs uint32, real *BucketLeapArray) (*SlidingWindowMetric, error) { if real == nil { return nil, errors.New("nil BucketLeapArray") } if err := base.CheckValidityForReuseStatistic(sampleCount, intervalInMs, real.SampleCount(), real.IntervalInMs()); err != nil { return nil, err } bucketLengthInMs := intervalInMs / sampleCount return &SlidingWindowMetric{ bucketLengthInMs: bucketLengthInMs, sampleCount: sampleCount, intervalInMs: intervalInMs, real: real, }, nil } // getBucketStartRange returns start time range of the bucket for the provided time. // The actual time span is: [start, end + in.bucketTimeLength) func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) { curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs()) end = curBucketStartTime start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs()) return } func (m *SlidingWindowMetric) getIntervalInSecond() float64 { return float64(m.intervalInMs) / 1000.0 } func (m *SlidingWindowMetric) count(event base.MetricEvent, values []*BucketWrap) int64 { ret := int64(0) for _, ww := range values { mb := ww.Value.Load() if mb == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.count()") continue } counter, ok := mb.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.count()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) continue } ret += counter.Get(event) } return ret } func (m *SlidingWindowMetric) GetSum(event base.MetricEvent) int64 { return m.getSumWithTime(util.CurrentTimeMillis(), event) } func (m *SlidingWindowMetric) getSumWithTime(now uint64, event base.MetricEvent) int64 { satisfiedBuckets := m.getSatisfiedBuckets(now) return m.count(event, satisfiedBuckets) } func (m *SlidingWindowMetric) GetQPS(event base.MetricEvent) float64 { return m.getQPSWithTime(util.CurrentTimeMillis(), event) } func (m *SlidingWindowMetric) GetPreviousQPS(event base.MetricEvent) float64 { return m.getQPSWithTime(util.CurrentTimeMillis()-uint64(m.bucketLengthInMs), event) } func (m *SlidingWindowMetric) getQPSWithTime(now uint64, event base.MetricEvent) float64 { return float64(m.getSumWithTime(now, event)) / m.getIntervalInSecond() } func (m *SlidingWindowMetric) getSatisfiedBuckets(now uint64) []*BucketWrap { start, end := m.getBucketStartRange(now) // Extracts the buckets of which the startTime is between [start, end] // which means the time view of the buckets is [firstStart, endStart+bucketLength) satisfiedBuckets := m.real.ValuesConditional(now, func(ws uint64) bool { return ws >= start && ws <= end }) return satisfiedBuckets } func (m *SlidingWindowMetric) GetMaxOfSingleBucket(event base.MetricEvent) int64 { now := util.CurrentTimeMillis() satisfiedBuckets := m.getSatisfiedBuckets(now) var curMax int64 = 0 for _, w := range satisfiedBuckets { mb := w.Value.Load() if mb == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.GetMaxOfSingleBucket()") continue } counter, ok := mb.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.GetMaxOfSingleBucket()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) continue } v := counter.Get(event) if v > curMax { curMax = v } } return curMax } func (m *SlidingWindowMetric) MinRT() float64 { now := util.CurrentTimeMillis() satisfiedBuckets := m.getSatisfiedBuckets(now) minRt := base.DefaultStatisticMaxRt for _, w := range satisfiedBuckets { mb := w.Value.Load() if mb == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MinRT()") continue } counter, ok := mb.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MinRT()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) continue } v := counter.MinRt() if v < minRt { minRt = v } } if minRt < 1 { minRt = 1 } return float64(minRt) } func (m *SlidingWindowMetric) MaxConcurrency() int32 { now := util.CurrentTimeMillis() satisfiedBuckets := m.getSatisfiedBuckets(now) maxConcurrency := int32(0) for _, w := range satisfiedBuckets { mb := w.Value.Load() if mb == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.MaxConcurrency()") continue } counter, ok := mb.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) continue } v := counter.MaxConcurrency() if v > maxConcurrency { maxConcurrency = v } } return maxConcurrency } func (m *SlidingWindowMetric) AvgRT() float64 { return float64(m.GetSum(base.MetricEventRt)) / float64(m.GetSum(base.MetricEventComplete)) } // SecondMetricsOnCondition aggregates metric items by second on condition that // the startTime of the statistic buckets satisfies the time predicate. func (m *SlidingWindowMetric) SecondMetricsOnCondition(predicate base.TimePredicate) []*base.MetricItem { ws := m.real.ValuesConditional(util.CurrentTimeMillis(), predicate) // Aggregate second-level MetricItem (only for stable metrics) wm := make(map[uint64][]*BucketWrap, 8) for _, w := range ws { bucketStart := atomic.LoadUint64(&w.BucketStart) secStart := bucketStart - bucketStart%1000 if arr, hasData := wm[secStart]; hasData { wm[secStart] = append(arr, w) } else { wm[secStart] = []*BucketWrap{w} } } items := make([]*base.MetricItem, 0, 8) for ts, values := range wm { if len(values) == 0 { continue } if item := m.metricItemFromBuckets(ts, values); item != nil { items = append(items, item) } } return items } // metricItemFromBuckets aggregates multiple bucket wrappers (based on the same startTime in second) // to the single MetricItem. func (m *SlidingWindowMetric) metricItemFromBuckets(ts uint64, ws []*BucketWrap) *base.MetricItem { item := &base.MetricItem{Timestamp: ts} var allRt int64 = 0 for _, w := range ws { mi := w.Value.Load() if mi == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.metricItemFromBuckets()") return nil } mb, ok := mi.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.metricItemFromBuckets()", "bucketStartTime", w.BucketStart, "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) return nil } item.PassQps += uint64(mb.Get(base.MetricEventPass)) item.BlockQps += uint64(mb.Get(base.MetricEventBlock)) item.ErrorQps += uint64(mb.Get(base.MetricEventError)) item.CompleteQps += uint64(mb.Get(base.MetricEventComplete)) mc := uint32(mb.MaxConcurrency()) if mc > item.Concurrency { item.Concurrency = mc } allRt += mb.Get(base.MetricEventRt) } if item.CompleteQps > 0 { item.AvgRt = uint64(allRt) / item.CompleteQps } else { item.AvgRt = uint64(allRt) } return item } func (m *SlidingWindowMetric) metricItemFromBucket(w *BucketWrap) *base.MetricItem { mi := w.Value.Load() if mi == nil { logging.Error(errors.New("nil BucketWrap"), "Current bucket value is nil in SlidingWindowMetric.metricItemFromBucket()") return nil } mb, ok := mi.(*MetricBucket) if !ok { logging.Error(errors.New("type assert failed"), "Fail to do type assert in SlidingWindowMetric.metricItemFromBucket()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name()) return nil } completeQps := mb.Get(base.MetricEventComplete) item := &base.MetricItem{ PassQps: uint64(mb.Get(base.MetricEventPass)), BlockQps: uint64(mb.Get(base.MetricEventBlock)), ErrorQps: uint64(mb.Get(base.MetricEventError)), CompleteQps: uint64(completeQps), Timestamp: w.BucketStart, } if completeQps > 0 { item.AvgRt = uint64(mb.Get(base.MetricEventRt) / completeQps) } else { item.AvgRt = uint64(mb.Get(base.MetricEventRt)) } return item }