core/stat/base/bucket_leap_array.go (176 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package base
import (
"reflect"
"sync/atomic"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)
// BucketLeapArray is the sliding window implementation based on LeapArray (as the sliding window infrastructure)
// and MetricBucket (as the data type). The MetricBucket is used to record statistic
// metrics per minimum time unit (i.e. the bucket time span).
type BucketLeapArray struct {
data LeapArray
dataType string
}
func (bla *BucketLeapArray) NewEmptyBucket() interface{} {
return NewMetricBucket()
}
func (bla *BucketLeapArray) ResetBucketTo(bw *BucketWrap, startTime uint64) *BucketWrap {
atomic.StoreUint64(&bw.BucketStart, startTime)
mb := bw.Value.Load().(*MetricBucket)
mb.reset()
return bw
}
// NewBucketLeapArray creates a BucketLeapArray with given attributes.
//
// The sampleCount represents the number of buckets, while intervalInMs represents
// the total time span of sliding window. Note that the sampleCount and intervalInMs must be positive
// and satisfies the condition that intervalInMs%sampleCount == 0.
// The validation must be done before call NewBucketLeapArray.
func NewBucketLeapArray(sampleCount uint32, intervalInMs uint32) *BucketLeapArray {
// TODO: also check params here.
bucketLengthInMs := intervalInMs / sampleCount
ret := &BucketLeapArray{
data: LeapArray{
bucketLengthInMs: bucketLengthInMs,
sampleCount: sampleCount,
intervalInMs: intervalInMs,
array: nil,
},
dataType: "MetricBucket",
}
arr := NewAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, ret)
ret.data.array = arr
return ret
}
func (bla *BucketLeapArray) SampleCount() uint32 {
return bla.data.sampleCount
}
func (bla *BucketLeapArray) IntervalInMs() uint32 {
return bla.data.intervalInMs
}
func (bla *BucketLeapArray) BucketLengthInMs() uint32 {
return bla.data.bucketLengthInMs
}
func (bla *BucketLeapArray) DataType() string {
return bla.dataType
}
func (bla *BucketLeapArray) GetIntervalInSecond() float64 {
return float64(bla.IntervalInMs()) / 1000.0
}
func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
// It might panic?
bla.addCountWithTime(util.CurrentTimeMillis(), event, count)
}
func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.Add(event, count)
}
func (bla *BucketLeapArray) UpdateConcurrency(concurrency int32) {
bla.updateConcurrencyWithTime(util.CurrentTimeMillis(), concurrency)
}
func (bla *BucketLeapArray) updateConcurrencyWithTime(now uint64, concurrency int32) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.UpdateConcurrency(concurrency)
}
func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
curBucket, err := bla.data.currentBucketOfTime(now, bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.currentBucketWithTime()", "now", now)
return nil
}
if curBucket == nil {
logging.Error(errors.New("current bucket is nil"), "Nil curBucket in BucketLeapArray.currentBucketWithTime()")
return nil
}
mb := curBucket.Value.Load()
if mb == nil {
logging.Error(errors.New("nil bucket"), "Current bucket atomic Value is nil in BucketLeapArray.currentBucketWithTime()")
return nil
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.currentBucketWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
return nil
}
return b
}
// Count returns the sum count for the given MetricEvent within all valid (non-expired) buckets.
func (bla *BucketLeapArray) Count(event base.MetricEvent) int64 {
// it might panic?
return bla.CountWithTime(util.CurrentTimeMillis(), event)
}
func (bla *BucketLeapArray) CountWithTime(now uint64, event base.MetricEvent) int64 {
_, err := bla.data.currentBucketOfTime(now, bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.CountWithTime()", "now", now)
}
count := int64(0)
for _, ww := range bla.data.valuesWithTime(now) {
mb := ww.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.CountWithTime()")
continue
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.CountWithTime()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
count += b.Get(event)
}
return count
}
// Values returns all valid (non-expired) buckets.
func (bla *BucketLeapArray) Values(now uint64) []*BucketWrap {
// Refresh current bucket if necessary.
_, err := bla.data.currentBucketOfTime(now, bla)
if err != nil {
logging.Error(err, "Failed to refresh current bucket in BucketLeapArray.Values()", "now", now)
}
return bla.data.valuesWithTime(now)
}
func (bla *BucketLeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
return bla.data.ValuesConditional(now, predicate)
}
func (bla *BucketLeapArray) MinRt() int64 {
_, err := bla.data.CurrentBucket(bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.MinRt()")
}
ret := base.DefaultStatisticMaxRt
for _, v := range bla.data.Values() {
mb := v.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.MinRt()")
continue
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.MinRt()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
mr := b.MinRt()
if ret > mr {
ret = mr
}
}
return ret
}
func (bla *BucketLeapArray) MaxConcurrency() int32 {
_, err := bla.data.CurrentBucket(bla)
if err != nil {
logging.Error(err, "Failed to get current bucket in BucketLeapArray.MaxConcurrency()")
}
ret := int32(0)
for _, v := range bla.data.Values() {
mb := v.Value.Load()
if mb == nil {
logging.Error(errors.New("current bucket is nil"), "Failed to load current bucket in BucketLeapArray.MaxConcurrency()")
continue
}
b, ok := mb.(*MetricBucket)
if !ok {
logging.Error(errors.New("fail to type assert"), "Bucket data type error in BucketLeapArray.MaxConcurrency()", "expectType", "*MetricBucket", "actualType", reflect.TypeOf(mb).Name())
continue
}
mc := b.MaxConcurrency()
if ret < mc {
ret = mc
}
}
return ret
}