func()

in core/hotspot/traffic_shaping.go [322:394]


func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, batchCount int64) *base.TokenResult {
	metric := c.metric
	if metric == nil {
		return nil
	}

	if c.metricType == Concurrency {
		return c.performCheckingForConcurrencyMetric(arg)
	} else if c.metricType > QPS {
		return nil
	}

	timeCounter := metric.RuleTimeCounter
	tokenCounter := metric.RuleTokenCounter
	if timeCounter == nil || tokenCounter == nil {
		return nil
	}

	// calculate available token
	tokenCount := c.threshold
	val, existed := c.specificItems[arg]
	if existed {
		tokenCount = val
	}
	if tokenCount <= 0 {
		msg := fmt.Sprintf("hotspot throttling check blocked, threshold is <= 0, arg: %v", arg)
		return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
	}
	intervalCostTime := int64(math.Round(float64(batchCount * c.durationInSec * 1000 / tokenCount)))
	for {
		var (
			expectedTime    int64
			currentTimeInMs int64
			lastPassTime    int64
			lastPassTimePtr *int64
		)

		currentTimeInMs = int64(util.CurrentTimeMillis())
		lastPassTimePtr = timeCounter.AddIfAbsent(arg, &currentTimeInMs)
		if lastPassTimePtr == nil {
			// initialize pointer for first access
			lastPassTimePtr = &currentTimeInMs
		}
		// load the last pass time
		lastPassTime = atomic.LoadInt64(lastPassTimePtr)
		// calculate expected pass time based on two scenarios:
		// 1. first access or expired statistics window
		// 2. normal within-window access
		if lastPassTimePtr == &currentTimeInMs || lastPassTime < currentTimeInMs-(c.durationInSec*1000) {
			// adjust the time of the previous window to one second ago, and at most TokenCount tokens can pass through
			expectedTime = currentTimeInMs - (c.durationInSec * 1000) + intervalCostTime
		} else {
			// normal cumulative calculation
			expectedTime = lastPassTime + intervalCostTime
		}

		if expectedTime <= currentTimeInMs || expectedTime-currentTimeInMs < c.maxQueueingTimeMs {
			if atomic.CompareAndSwapInt64(lastPassTimePtr, lastPassTime, currentTimeInMs) {
				awaitTime := expectedTime - currentTimeInMs
				if awaitTime > 0 {
					atomic.StoreInt64(lastPassTimePtr, expectedTime)
					return base.NewTokenResultShouldWait(time.Duration(awaitTime) * time.Millisecond)
				}
				return nil
			} else {
				runtime.Gosched()
			}
		} else {
			msg := fmt.Sprintf("hotspot throttling check blocked, wait time exceedes max queueing time, arg: %v", arg)
			return base.NewTokenResultBlockedWithCause(base.BlockTypeHotSpotParamFlow, msg, c.BoundRule(), nil)
		}
	}
}