func()

in ste/concurrencyTuner.go [139:264]


func (t *autoConcurrencyTuner) worker() {
	const standardMultiplier = 2
	const boostedMultiplier = standardMultiplier * 2
	const topOfBoostZone = 256 // boosted multiplier applies up to this many connections
	const slowdownFactor = 5
	const minMulitplier = 1.19 // really this is 1.2, but use a little less to make the floating point comparisons robust
	const fudgeFactor = 0.2

	multiplier := float32(boostedMultiplier)
	concurrency := float32(t.initialConcurrency)
	atMax := false
	highCpu := false
	everSawHighCpu := false
	sawHighMultiGbps := false
	probeHigherRegardless := false
	dontBackoffRegardless := false
	multiplierReductionCount := 0
	lastReason := ConcurrencyReasonNone

	// get initial baseline throughput
	lastSpeed, _ := t.getCurrentSpeed()

	for { // todo, add the conditions here
		rateChangeReason := concurrencyReasonSeeking

		if concurrency >= topOfBoostZone && multiplier > standardMultiplier {
			multiplier = standardMultiplier // don't use boosted multiplier for ever
		}

		// enforce a ceiling
		atMax = concurrency*multiplier > float32(t.maxConcurrency)
		if atMax {
			multiplier = float32(t.maxConcurrency) / concurrency
			rateChangeReason = concurrencyReasonHitMax
		}

		// compute increase
		concurrency = concurrency * multiplier
		desiredSpeedIncrease := lastSpeed * (multiplier - 1) * fudgeFactor // we'd like it to speed up linearly, but we'll accept a _lot_ less, according to fudge factor in the interests of finding best possible speed
		desiredNewSpeed := lastSpeed + desiredSpeedIncrease

		// action the increase and measure its effect
		lastReason = t.setConcurrency(concurrency, rateChangeReason)
		lastSpeed, highCpu = t.getCurrentSpeed()
		if lastSpeed > 11000 {
			sawHighMultiGbps = true
		}
		if highCpu {
			everSawHighCpu = true // this doesn't stop us probing higher concurrency, since sometimes that works even when CPU looks high, but it does change the way we report the result
		}

		if t.isBenchmarking {
			// Be a little more aggressive if we are tuning for benchmarking purposes (as opposed to day to day use)

			// If we are seeing retries (within "normal" concurrency range) then for benchmarking purposes we don't want to back off.
			// (Since if we back off the retries might stop and then they won't be reported on as a limiting factor.)
			sawRetry := atomic.SwapInt64(&t.atomicRetryCount, 0) > 0
			dontBackoffRegardless = sawRetry && concurrency <= 256

			// Workaround for variable throughput when targeting 20 Gbps account limit (concurrency around 64 didn't seem to give stable throughput in some tests)
			// TODO: review this, and look for root cause/better solution
			probeHigherRegardless = sawHighMultiGbps && concurrency >= 32 && concurrency < 128 && multiplier >= standardMultiplier
		}

		// decide what to do based on the measurement
		if lastSpeed > desiredNewSpeed || probeHigherRegardless {
			// Our concurrency change gave the hoped-for speed increase, so loop around and see if another increase will also work,
			// unless already at max
			if atMax {
				break
			}
		} else if dontBackoffRegardless {
			// nothing more we can do
			break
		} else {
			// the new speed didn't work, so we conclude it was too aggressive and back off to where we were before
			concurrency = concurrency / multiplier

			// reduce multiplier to probe more slowly on the next iteration
			if multiplier > standardMultiplier {
				multiplier = standardMultiplier // just back off from our "boosted" multiplier
			} else {
				multiplier = 1 + (multiplier-1)/slowdownFactor // back off to a much smaller multiplier
			}

			// bump multiplier up until its at least enough to influence the connection count by 1
			// (but, to make sure our algorithm terminates, limit how much we do this)
			multiplierReductionCount++
			if multiplierReductionCount <= 2 {
				for int(multiplier*concurrency) == int(concurrency) {
					multiplier += 0.05
				}
			}

			if multiplier < minMulitplier {
				break // no point in tuning anymore
			} else {
				lastReason = t.setConcurrency(concurrency, concurrencyReasonBackoff) //nolint:staticcheck
				lastSpeed, _ = t.getCurrentSpeed()                                   // must re-measure immediately after backing off
			}
		}
	}

	if atMax {
		// provide no special "we found the best value" result, because actually we possibly didn't find it, we just hit the max,
		// and we've already notified caller of that reason, when we tied using the max
	} else {
		// provide the final value once with a reason why its our final value
		if everSawHighCpu {
			lastReason = t.setConcurrency(concurrency, concurrencyReasonHighCpu)
		} else {
			lastReason = t.setConcurrency(concurrency, concurrencyReasonAtOptimum)
		}
		_, _ = t.getCurrentSpeed() // read from the channel
	}

	t.storeFinalState(lastReason, concurrency)
	t.signalStability()

	// now just provide an "inactive" value for ever
	for {
		_ = t.setConcurrency(concurrency, concurrencyReasonFinished)
		_, _ = t.getCurrentSpeed() // read from the channel
		t.signalStability()        // in case anyone new has "subscribed"
	}
}