in core/hotspot/traffic_shaping.go [322:394]
func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, batchCount int64) *base.TokenResult {
metric := c.metric
if metric == nil {
return nil
}
if c.metricType == Concurrency {
return c.performCheckingForConcurrencyMetric(arg)
} else if c.metricType > QPS {
return nil
}
timeCounter := metric.RuleTimeCounter
tokenCounter := metric.RuleTokenCounter
if timeCounter == nil || tokenCounter == nil {
return nil
}
// calculate available token
tokenCount := c.threshold
val, existed := c.specificItems[arg]
if existed {
tokenCount = val
}
if tokenCount <= 0 {
msg := fmt.Sprintf("hotspot throttling check blocked, threshold is <= 0, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
intervalCostTime := int64(math.Round(float64(batchCount * c.durationInSec * 1000 / tokenCount)))
for {
var (
expectedTime int64
currentTimeInMs int64
lastPassTime int64
lastPassTimePtr *int64
)
currentTimeInMs = int64(util.CurrentTimeMillis())
lastPassTimePtr = timeCounter.AddIfAbsent(arg, ¤tTimeInMs)
if lastPassTimePtr == nil {
// initialize pointer for first access
lastPassTimePtr = ¤tTimeInMs
}
// load the last pass time
lastPassTime = atomic.LoadInt64(lastPassTimePtr)
// calculate expected pass time based on two scenarios:
// 1. first access or expired statistics window
// 2. normal within-window access
if lastPassTimePtr == ¤tTimeInMs || lastPassTime < currentTimeInMs-(c.durationInSec*1000) {
// adjust the time of the previous window to one second ago, and at most TokenCount tokens can pass through
expectedTime = currentTimeInMs - (c.durationInSec * 1000) + intervalCostTime
} else {
// normal cumulative calculation
expectedTime = lastPassTime + intervalCostTime
}
if expectedTime <= currentTimeInMs || expectedTime-currentTimeInMs < c.maxQueueingTimeMs {
if atomic.CompareAndSwapInt64(lastPassTimePtr, lastPassTime, currentTimeInMs) {
awaitTime := expectedTime - currentTimeInMs
if awaitTime > 0 {
atomic.StoreInt64(lastPassTimePtr, expectedTime)
return base.NewTokenResultShouldWait(time.Duration(awaitTime) * time.Millisecond)
}
return nil
} else {
runtime.Gosched()
}
} else {
msg := fmt.Sprintf("hotspot throttling check blocked, wait time exceedes max queueing time, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
}
}