func()

in core/hotspot/traffic_shaping.go [227:320]


func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, batchCount int64) *base.TokenResult {
	metric := c.metric
	if metric == nil {
		return nil
	}

	if c.metricType == Concurrency {
		return c.performCheckingForConcurrencyMetric(arg)
	} else if c.metricType > QPS {
		return nil
	}

	timeCounter := metric.RuleTimeCounter
	tokenCounter := metric.RuleTokenCounter
	if timeCounter == nil || tokenCounter == nil {
		return nil
	}

	// calculate available token
	tokenCount := c.threshold
	val, existed := c.specificItems[arg]
	if existed {
		tokenCount = val
	}
	if tokenCount <= 0 {
		msg := fmt.Sprintf("hotspot reject check blocked, threshold is <= 0, arg: %v", arg)
		return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
	}
	maxCount := tokenCount + c.burstCount
	if batchCount > maxCount {
		// return blocked because the batch number is more than max count of rejectTrafficShapingController
		msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than max token count, arg: %v", arg)
		return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
	}

	for {
		currentTimeInMs := int64(util.CurrentTimeMillis())
		lastAddTokenTimePtr := timeCounter.AddIfAbsent(arg, &currentTimeInMs)
		if lastAddTokenTimePtr == nil {
			// First to fill token, and consume token immediately
			leftCount := maxCount - batchCount
			tokenCounter.AddIfAbsent(arg, &leftCount)
			return nil
		}

		// Calculate the time duration since last token was added.
		passTime := currentTimeInMs - atomic.LoadInt64(lastAddTokenTimePtr)
		if passTime > c.durationInSec*1000 {
			// Refill the tokens because statistic window has passed.
			leftCount := maxCount - batchCount
			oldQpsPtr := tokenCounter.AddIfAbsent(arg, &leftCount)
			if oldQpsPtr == nil {
				// Might not be accurate here.
				atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs)
				return nil
			} else {
				// refill token
				restQps := atomic.LoadInt64(oldQpsPtr)
				toAddTokenNum := passTime * tokenCount / (c.durationInSec * 1000)
				newQps := int64(0)
				if toAddTokenNum+restQps > maxCount {
					newQps = maxCount - batchCount
				} else {
					newQps = toAddTokenNum + restQps - batchCount
				}
				if newQps < 0 {
					msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than available token count, arg: %v", arg)
					return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
				}
				if atomic.CompareAndSwapInt64(oldQpsPtr, restQps, newQps) {
					atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs)
					return nil
				}
				runtime.Gosched()
			}
		} else {
			//check whether the rest of token is enough to batch
			oldQpsPtr, found := tokenCounter.Get(arg)
			if found {
				oldRestToken := atomic.LoadInt64(oldQpsPtr)
				if oldRestToken-batchCount >= 0 {
					//update
					if atomic.CompareAndSwapInt64(oldQpsPtr, oldRestToken, oldRestToken-batchCount) {
						return nil
					}
				} else {
					msg := fmt.Sprintf("hotspot reject check blocked, request batch count is more than available token count, arg: %v", arg)
					return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
				}
			}
			runtime.Gosched()
		}
	}
}