in core/hotspot/traffic_shaping.go [227:320]
func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, batchCount int64) *base.TokenResult {
metric := c.metric
if metric == nil {
return nil
}
if c.metricType == Concurrency {
return c.performCheckingForConcurrencyMetric(arg)
} else if c.metricType > QPS {
return nil
}
timeCounter := metric.RuleTimeCounter
tokenCounter := metric.RuleTokenCounter
if timeCounter == nil || tokenCounter == nil {
return nil
}
// calculate available token
tokenCount := c.threshold
val, existed := c.specificItems[arg]
if existed {
tokenCount = val
}
if tokenCount <= 0 {
msg := fmt.Sprintf("hotspot reject check blocked, threshold is <= 0, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
maxCount := tokenCount + c.burstCount
if batchCount > maxCount {
// return blocked because the batch number is more than max count of rejectTrafficShapingController
msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than max token count, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
for {
currentTimeInMs := int64(util.CurrentTimeMillis())
lastAddTokenTimePtr := timeCounter.AddIfAbsent(arg, ¤tTimeInMs)
if lastAddTokenTimePtr == nil {
// First to fill token, and consume token immediately
leftCount := maxCount - batchCount
tokenCounter.AddIfAbsent(arg, &leftCount)
return nil
}
// Calculate the time duration since last token was added.
passTime := currentTimeInMs - atomic.LoadInt64(lastAddTokenTimePtr)
if passTime > c.durationInSec*1000 {
// Refill the tokens because statistic window has passed.
leftCount := maxCount - batchCount
oldQpsPtr := tokenCounter.AddIfAbsent(arg, &leftCount)
if oldQpsPtr == nil {
// Might not be accurate here.
atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs)
return nil
} else {
// refill token
restQps := atomic.LoadInt64(oldQpsPtr)
toAddTokenNum := passTime * tokenCount / (c.durationInSec * 1000)
newQps := int64(0)
if toAddTokenNum+restQps > maxCount {
newQps = maxCount - batchCount
} else {
newQps = toAddTokenNum + restQps - batchCount
}
if newQps < 0 {
msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than available token count, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
if atomic.CompareAndSwapInt64(oldQpsPtr, restQps, newQps) {
atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs)
return nil
}
runtime.Gosched()
}
} else {
//check whether the rest of token is enough to batch
oldQpsPtr, found := tokenCounter.Get(arg)
if found {
oldRestToken := atomic.LoadInt64(oldQpsPtr)
if oldRestToken-batchCount >= 0 {
//update
if atomic.CompareAndSwapInt64(oldQpsPtr, oldRestToken, oldRestToken-batchCount) {
return nil
}
} else {
msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than available token count, arg: %v", arg)
return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
}
}
runtime.Gosched()
}
}
}