func()

in filter/adaptivesvc/limiter/hill_climbing.go [156:244]


func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, error) {
	u.limiter.mutex.Lock()
	defer u.limiter.mutex.Unlock()

	now := time.Now()
	option := HillClimbingOptionDoNothing

	lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
	updateInterval := u.limiter.updateInterval.Load()
	rttAvg := u.limiter.rttAvg.Load()
	transactionNum := u.limiter.transactionNum.Load()
	limitation := u.limiter.limitation.Load()

	// the current option is expired
	if now.Before(lastUpdatedTime) {
		return option, nil
	}

	if now.Sub(lastUpdatedTime) > updateInterval || rttAvg == 0 {
		// the current req is on the next round or no rttAvg.

		// FIXME(justxuewei): If all requests in one round not receive responses, rttAvg will be 0, and maxCapacity will
		//  be 0 as well, the actual maxCapacity, however, is not 0.
		maxCapacity := float64(transactionNum) * float64(updateInterval.Milliseconds()) / rttAvg
		VerboseDebugf("[HillClimbingUpdater] maxCapacity: %f, transactionNum: %d, rttAvg: %f, bestRTTAvg: %f, "+
			"updateInterval: %d",
			maxCapacity, transactionNum, rttAvg, u.limiter.bestRTTAvg.Load(), updateInterval.Milliseconds())

		// Consider extending limitation if concurrent is about to reach the limitation.
		if u.limiter.bestRTTAvg.Load() == math.MaxFloat64 || uint64(maxCapacity*1.5) > limitation {
			if updateInterval == radicalPeriod {
				option = HillClimbingOptionExtendPlus
			} else {
				option = HillClimbingOptionExtend
			}
		}

		tps := uint64(1000.0 * float64(transactionNum) / float64(updateInterval.Milliseconds()))
		VerboseDebugf("[HillClimbingUpdater] The TPS is %d, transactionNum: %d, updateInterval: %d.",
			tps, transactionNum, updateInterval)

		if tps > u.limiter.bestTPS.Load() {
			VerboseDebugf("[HillClimbingUpdater] The best TPS is updated from %d to %d.",
				u.limiter.bestTPS.Load(), tps)
			// tps is the best in the history, update
			// all best metrics.
			u.limiter.bestTPS.Store(tps)
			u.limiter.bestRTTAvg.Store(rttAvg)
			u.limiter.bestMaxCapacity.Store(maxCapacity)
			u.limiter.bestLimitation.Store(u.limiter.limitation.Load())
			VerboseDebugf("[HillClimbingUpdater] Best-metrics are up-to-date, "+
				"seq: %d, bestTPS: %d, bestRTTAvg: %.4f, bestMaxCapacity: %d,"+
				" bestLimitation: %d.", u.seq, u.limiter.bestTPS.Load(),
				u.limiter.bestRTTAvg.Load(), u.limiter.bestMaxCapacity.Load(),
				u.limiter.bestLimitation.Load())
		} else {
			VerboseDebugf("[HillClimbingUpdater] The best TPS is not updated, best TPS is %d, "+
				"the current TPS is %d",
				u.limiter.bestTPS.Load(), tps)
			if u.shouldShrink(transactionNum, maxCapacity, tps, rttAvg) {
				if updateInterval == radicalPeriod {
					option = HillClimbingOptionShrinkPlus
				} else {
					option = HillClimbingOptionShrink
				}
				// shrinking limitation means the process of adjusting
				// limitation goes to stable, so extends the update
				// interval to avoid adjusting frequently.
				u.limiter.updateInterval.Store(minDuration(updateInterval*2, stablePeriod))
			}
		}

		// reset metrics for the new round
		u.limiter.transactionNum.Store(0)
		u.limiter.rttAvg.Store(float64(rtt))
		u.limiter.lastUpdatedTime.Store(time.Now())
		VerboseDebugf("[HillClimbingUpdater] A new round is applied, all metrics are reset.")
	} else {
		// still on the current round

		u.limiter.transactionNum.Add(1)
		// ra = (ra * c  + r) / (c + 1), where ra denotes rttAvg,
		// c denotes transactionNum, r denotes rtt.
		u.limiter.rttAvg.Store((rttAvg*float64(transactionNum) + float64(rtt)) / float64(transactionNum+1))
		option = HillClimbingOptionDoNothing
	}

	return option, nil
}