in filter/adaptivesvc/limiter/hill_climbing.go [156:244]
func (u *HillClimbingUpdater) getOption(rtt, _ uint64) (HillClimbingOption, error) {
u.limiter.mutex.Lock()
defer u.limiter.mutex.Unlock()
now := time.Now()
option := HillClimbingOptionDoNothing
lastUpdatedTime := u.limiter.lastUpdatedTime.Load()
updateInterval := u.limiter.updateInterval.Load()
rttAvg := u.limiter.rttAvg.Load()
transactionNum := u.limiter.transactionNum.Load()
limitation := u.limiter.limitation.Load()
// the current option is expired
if now.Before(lastUpdatedTime) {
return option, nil
}
if now.Sub(lastUpdatedTime) > updateInterval || rttAvg == 0 {
// the current req is on the next round or no rttAvg.
// FIXME(justxuewei): If all requests in one round not receive responses, rttAvg will be 0, and maxCapacity will
// be 0 as well, the actual maxCapacity, however, is not 0.
maxCapacity := float64(transactionNum) * float64(updateInterval.Milliseconds()) / rttAvg
VerboseDebugf("[HillClimbingUpdater] maxCapacity: %f, transactionNum: %d, rttAvg: %f, bestRTTAvg: %f, "+
"updateInterval: %d",
maxCapacity, transactionNum, rttAvg, u.limiter.bestRTTAvg.Load(), updateInterval.Milliseconds())
// Consider extending limitation if concurrent is about to reach the limitation.
if u.limiter.bestRTTAvg.Load() == math.MaxFloat64 || uint64(maxCapacity*1.5) > limitation {
if updateInterval == radicalPeriod {
option = HillClimbingOptionExtendPlus
} else {
option = HillClimbingOptionExtend
}
}
tps := uint64(1000.0 * float64(transactionNum) / float64(updateInterval.Milliseconds()))
VerboseDebugf("[HillClimbingUpdater] The TPS is %d, transactionNum: %d, updateInterval: %d.",
tps, transactionNum, updateInterval)
if tps > u.limiter.bestTPS.Load() {
VerboseDebugf("[HillClimbingUpdater] The best TPS is updated from %d to %d.",
u.limiter.bestTPS.Load(), tps)
// tps is the best in the history, update
// all best metrics.
u.limiter.bestTPS.Store(tps)
u.limiter.bestRTTAvg.Store(rttAvg)
u.limiter.bestMaxCapacity.Store(maxCapacity)
u.limiter.bestLimitation.Store(u.limiter.limitation.Load())
VerboseDebugf("[HillClimbingUpdater] Best-metrics are up-to-date, "+
"seq: %d, bestTPS: %d, bestRTTAvg: %.4f, bestMaxCapacity: %d,"+
" bestLimitation: %d.", u.seq, u.limiter.bestTPS.Load(),
u.limiter.bestRTTAvg.Load(), u.limiter.bestMaxCapacity.Load(),
u.limiter.bestLimitation.Load())
} else {
VerboseDebugf("[HillClimbingUpdater] The best TPS is not updated, best TPS is %d, "+
"the current TPS is %d",
u.limiter.bestTPS.Load(), tps)
if u.shouldShrink(transactionNum, maxCapacity, tps, rttAvg) {
if updateInterval == radicalPeriod {
option = HillClimbingOptionShrinkPlus
} else {
option = HillClimbingOptionShrink
}
// shrinking limitation means the process of adjusting
// limitation goes to stable, so extends the update
// interval to avoid adjusting frequently.
u.limiter.updateInterval.Store(minDuration(updateInterval*2, stablePeriod))
}
}
// reset metrics for the new round
u.limiter.transactionNum.Store(0)
u.limiter.rttAvg.Store(float64(rtt))
u.limiter.lastUpdatedTime.Store(time.Now())
VerboseDebugf("[HillClimbingUpdater] A new round is applied, all metrics are reset.")
} else {
// still on the current round
u.limiter.transactionNum.Add(1)
// ra = (ra * c + r) / (c + 1), where ra denotes rttAvg,
// c denotes transactionNum, r denotes rtt.
u.limiter.rttAvg.Store((rttAvg*float64(transactionNum) + float64(rtt)) / float64(transactionNum+1))
option = HillClimbingOptionDoNothing
}
return option, nil
}