core/stat/base/leap_array.go (188 lines of code) (raw):
// Copyright 1999-2020 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package base
import (
"fmt"
"runtime"
"sync/atomic"
"unsafe"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)
// BucketWrap represents a slot to record metrics.
//
// In order to reduce memory footprint, BucketWrap does not hold the length of the bucket.
// The length of BucketWrap could be seen in LeapArray.
// The scope of time is [startTime, startTime+bucketLength).
// The size of BucketWrap is 24(8+16) bytes.
type BucketWrap struct {
// BucketStart represents start timestamp of this statistic bucket wrapper.
BucketStart uint64
// Value represents the actual data structure of the metrics (e.g. MetricBucket).
Value atomic.Value
}
func (ww *BucketWrap) resetTo(startTime uint64) {
ww.BucketStart = startTime
}
func (ww *BucketWrap) isTimeInBucket(now uint64, bucketLengthInMs uint32) bool {
return ww.BucketStart <= now && now < ww.BucketStart+uint64(bucketLengthInMs)
}
func calculateStartTime(now uint64, bucketLengthInMs uint32) uint64 {
return now - (now % uint64(bucketLengthInMs))
}
// AtomicBucketWrapArray represents a thread-safe circular array.
//
// The length of the array should be provided on-create and cannot be modified.
type AtomicBucketWrapArray struct {
// The base address for real data array
base unsafe.Pointer
// The length of slice(array), it can not be modified.
length int
data []*BucketWrap
}
func NewAtomicBucketWrapArrayWithTime(len int, bucketLengthInMs uint32, now uint64, generator BucketGenerator) *AtomicBucketWrapArray {
ret := &AtomicBucketWrapArray{
length: len,
data: make([]*BucketWrap, len),
}
idx := int((now / uint64(bucketLengthInMs)) % uint64(len))
startTime := calculateStartTime(now, bucketLengthInMs)
for i := idx; i <= len-1; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
for i := 0; i < idx; i++ {
ww := &BucketWrap{
BucketStart: startTime,
Value: atomic.Value{},
}
ww.Value.Store(generator.NewEmptyBucket())
ret.data[i] = ww
startTime += uint64(bucketLengthInMs)
}
// calculate base address for real data array
sliHeader := (*util.SliceHeader)(unsafe.Pointer(&ret.data))
ret.base = unsafe.Pointer((**BucketWrap)(unsafe.Pointer(sliHeader.Data)))
return ret
}
// NewAtomicBucketWrapArray creates an AtomicBucketWrapArray and initializes data of each BucketWrap.
//
// The len represents the length of the circular array.
// The bucketLengthInMs represents bucket length of each bucket (in milliseconds).
// The generator accepts a BucketGenerator to generate and refresh buckets.
func NewAtomicBucketWrapArray(len int, bucketLengthInMs uint32, generator BucketGenerator) *AtomicBucketWrapArray {
return NewAtomicBucketWrapArrayWithTime(len, bucketLengthInMs, util.CurrentTimeMillis(), generator)
}
func (aa *AtomicBucketWrapArray) elementOffset(idx int) (unsafe.Pointer, bool) {
if idx >= aa.length || idx < 0 {
logging.Error(errors.New("array index out of bounds"),
"array index out of bounds in AtomicBucketWrapArray.elementOffset()",
"idx", idx, "arrayLength", aa.length)
return nil, false
}
basePtr := aa.base
return unsafe.Pointer(uintptr(basePtr) + uintptr(idx)*unsafe.Sizeof(basePtr)), true
}
func (aa *AtomicBucketWrapArray) get(idx int) *BucketWrap {
// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
if offset, ok := aa.elementOffset(idx); ok {
return (*BucketWrap)(atomic.LoadPointer((*unsafe.Pointer)(offset)))
}
return nil
}
func (aa *AtomicBucketWrapArray) compareAndSet(idx int, except, update *BucketWrap) bool {
// aa.elementOffset(idx) return the secondary pointer of BucketWrap, which is the pointer to the aa.data[idx]
// then convert to (*unsafe.Pointer)
// update secondary pointer
if offset, ok := aa.elementOffset(idx); ok {
return atomic.CompareAndSwapPointer((*unsafe.Pointer)(offset), unsafe.Pointer(except), unsafe.Pointer(update))
}
return false
}
// LeapArray represents the fundamental implementation of a sliding window data-structure.
//
// Some important attributes: the sampleCount represents the number of buckets,
// while intervalInMs represents the total time span of the sliding window.
//
// For example, assuming sampleCount=5, intervalInMs is 1000ms, so the bucketLength is 200ms.
// Let's give a diagram to illustrate.
// Suppose current timestamp is 1188, bucketLength is 200ms, intervalInMs is 1000ms, then
// time span of current bucket is [1000, 1200). The representation of the underlying structure:
//
// B0 B1 B2 B3 B4
// |_______|_______|_______|_______|_______|
// 1000 1200 400 600 800 (1000) ms
// ^
// time=1188
type LeapArray struct {
bucketLengthInMs uint32
// sampleCount represents the number of BucketWrap.
sampleCount uint32
// intervalInMs represents the total time span of the sliding window (in milliseconds).
intervalInMs uint32
// array represents the internal circular array.
array *AtomicBucketWrapArray
// updateLock is the internal lock for update operations.
updateLock mutex
}
func NewLeapArray(sampleCount uint32, intervalInMs uint32, generator BucketGenerator) (*LeapArray, error) {
if sampleCount == 0 || intervalInMs%sampleCount != 0 {
return nil, errors.Errorf("Invalid parameters, intervalInMs is %d, sampleCount is %d", intervalInMs, sampleCount)
}
if generator == nil {
return nil, errors.Errorf("Invalid parameters, BucketGenerator is nil")
}
bucketLengthInMs := intervalInMs / sampleCount
return &LeapArray{
bucketLengthInMs: bucketLengthInMs,
sampleCount: sampleCount,
intervalInMs: intervalInMs,
array: NewAtomicBucketWrapArray(int(sampleCount), bucketLengthInMs, generator),
}, nil
}
func (la *LeapArray) CurrentBucket(bg BucketGenerator) (*BucketWrap, error) {
return la.currentBucketOfTime(util.CurrentTimeMillis(), bg)
}
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
if now <= 0 {
return nil, errors.New("Current time is less than 0.")
}
idx := la.calculateTimeIdx(now)
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for { //spin to get the current BucketWrap
old := la.array.get(idx)
if old == nil {
// because la.array.data had initiated when new la.array
// theoretically, here is not reachable
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.
// reset BucketWrap
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {
if la.sampleCount == 1 {
// if sampleCount==1 in leap array, in concurrency scenario, this case is possible
return old, nil
}
// TODO: reserve for some special case (e.g. when occupying "future" buckets).
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
}
}
func (la *LeapArray) calculateTimeIdx(now uint64) int {
timeId := now / uint64(la.bucketLengthInMs)
return int(timeId) % la.array.length
}
// Values returns all valid (non-expired) buckets between [curBucketEnd-windowInterval, curBucketEnd],
// where curBucketEnd=curBucketStart+bucketLength.
func (la *LeapArray) Values() []*BucketWrap {
return la.valuesWithTime(util.CurrentTimeMillis())
}
func (la *LeapArray) valuesWithTime(now uint64) []*BucketWrap {
if now <= 0 {
return make([]*BucketWrap, 0)
}
ret := make([]*BucketWrap, 0, la.array.length)
for i := 0; i < la.array.length; i++ {
ww := la.array.get(i)
if ww == nil || la.isBucketDeprecated(now, ww) {
continue
}
ret = append(ret, ww)
}
return ret
}
// ValuesConditional returns all buckets of which the startTimestamp satisfies the given timestamp condition (predicate).
// The function uses the parameter "now" as the target timestamp.
func (la *LeapArray) ValuesConditional(now uint64, predicate base.TimePredicate) []*BucketWrap {
if now <= 0 {
return make([]*BucketWrap, 0)
}
ret := make([]*BucketWrap, 0, la.array.length)
for i := 0; i < la.array.length; i++ {
ww := la.array.get(i)
if ww == nil || la.isBucketDeprecated(now, ww) || !predicate(atomic.LoadUint64(&ww.BucketStart)) {
continue
}
ret = append(ret, ww)
}
return ret
}
// isBucketDeprecated checks whether the BucketWrap is expired, according to given timestamp.
func (la *LeapArray) isBucketDeprecated(now uint64, ww *BucketWrap) bool {
ws := atomic.LoadUint64(&ww.BucketStart)
return (now - ws) > uint64(la.intervalInMs)
}
// BucketGenerator represents the "generic" interface for generating and refreshing buckets.
type BucketGenerator interface {
// NewEmptyBucket creates new raw data inside the bucket.
NewEmptyBucket() interface{}
// ResetBucketTo refreshes the BucketWrap to provided startTime and resets all data inside the given bucket.
ResetBucketTo(bucket *BucketWrap, startTime uint64) *BucketWrap
}