in ste/concurrencyTuner.go [139:264]
func (t *autoConcurrencyTuner) worker() {
const standardMultiplier = 2
const boostedMultiplier = standardMultiplier * 2
const topOfBoostZone = 256 // boosted multiplier applies up to this many connections
const slowdownFactor = 5
const minMulitplier = 1.19 // really this is 1.2, but use a little less to make the floating point comparisons robust
const fudgeFactor = 0.2
multiplier := float32(boostedMultiplier)
concurrency := float32(t.initialConcurrency)
atMax := false
highCpu := false
everSawHighCpu := false
sawHighMultiGbps := false
probeHigherRegardless := false
dontBackoffRegardless := false
multiplierReductionCount := 0
lastReason := ConcurrencyReasonNone
// get initial baseline throughput
lastSpeed, _ := t.getCurrentSpeed()
for { // todo, add the conditions here
rateChangeReason := concurrencyReasonSeeking
if concurrency >= topOfBoostZone && multiplier > standardMultiplier {
multiplier = standardMultiplier // don't use boosted multiplier for ever
}
// enforce a ceiling
atMax = concurrency*multiplier > float32(t.maxConcurrency)
if atMax {
multiplier = float32(t.maxConcurrency) / concurrency
rateChangeReason = concurrencyReasonHitMax
}
// compute increase
concurrency = concurrency * multiplier
desiredSpeedIncrease := lastSpeed * (multiplier - 1) * fudgeFactor // we'd like it to speed up linearly, but we'll accept a _lot_ less, according to fudge factor in the interests of finding best possible speed
desiredNewSpeed := lastSpeed + desiredSpeedIncrease
// action the increase and measure its effect
lastReason = t.setConcurrency(concurrency, rateChangeReason)
lastSpeed, highCpu = t.getCurrentSpeed()
if lastSpeed > 11000 {
sawHighMultiGbps = true
}
if highCpu {
everSawHighCpu = true // this doesn't stop us probing higher concurrency, since sometimes that works even when CPU looks high, but it does change the way we report the result
}
if t.isBenchmarking {
// Be a little more aggressive if we are tuning for benchmarking purposes (as opposed to day to day use)
// If we are seeing retries (within "normal" concurrency range) then for benchmarking purposes we don't want to back off.
// (Since if we back off the retries might stop and then they won't be reported on as a limiting factor.)
sawRetry := atomic.SwapInt64(&t.atomicRetryCount, 0) > 0
dontBackoffRegardless = sawRetry && concurrency <= 256
// Workaround for variable throughput when targeting 20 Gbps account limit (concurrency around 64 didn't seem to give stable throughput in some tests)
// TODO: review this, and look for root cause/better solution
probeHigherRegardless = sawHighMultiGbps && concurrency >= 32 && concurrency < 128 && multiplier >= standardMultiplier
}
// decide what to do based on the measurement
if lastSpeed > desiredNewSpeed || probeHigherRegardless {
// Our concurrency change gave the hoped-for speed increase, so loop around and see if another increase will also work,
// unless already at max
if atMax {
break
}
} else if dontBackoffRegardless {
// nothing more we can do
break
} else {
// the new speed didn't work, so we conclude it was too aggressive and back off to where we were before
concurrency = concurrency / multiplier
// reduce multiplier to probe more slowly on the next iteration
if multiplier > standardMultiplier {
multiplier = standardMultiplier // just back off from our "boosted" multiplier
} else {
multiplier = 1 + (multiplier-1)/slowdownFactor // back off to a much smaller multiplier
}
// bump multiplier up until its at least enough to influence the connection count by 1
// (but, to make sure our algorithm terminates, limit how much we do this)
multiplierReductionCount++
if multiplierReductionCount <= 2 {
for int(multiplier*concurrency) == int(concurrency) {
multiplier += 0.05
}
}
if multiplier < minMulitplier {
break // no point in tuning anymore
} else {
lastReason = t.setConcurrency(concurrency, concurrencyReasonBackoff) //nolint:staticcheck
lastSpeed, _ = t.getCurrentSpeed() // must re-measure immediately after backing off
}
}
}
if atMax {
// provide no special "we found the best value" result, because actually we possibly didn't find it, we just hit the max,
// and we've already notified caller of that reason, when we tied using the max
} else {
// provide the final value once with a reason why its our final value
if everSawHighCpu {
lastReason = t.setConcurrency(concurrency, concurrencyReasonHighCpu)
} else {
lastReason = t.setConcurrency(concurrency, concurrencyReasonAtOptimum)
}
_, _ = t.getCurrentSpeed() // read from the channel
}
t.storeFinalState(lastReason, concurrency)
t.signalStability()
// now just provide an "inactive" value for ever
for {
_ = t.setConcurrency(concurrency, concurrencyReasonFinished)
_, _ = t.getCurrentSpeed() // read from the channel
t.signalStability() // in case anyone new has "subscribed"
}
}