func()

in appender.go [616:673]


func (a *Appender) maybeScaleDown(now time.Time, info scalingInfo, timedFlush *uint) bool {
	// Only downscale when there is more than 1 active indexer.
	if info.activeIndexers == 1 {
		return false
	}
	// If the CPU quota changes and there is more than 1 indexer, downscale an
	// active indexer. This downscaling action isn't subject to the downscaling
	// cooldown, since doing so would result in using much more CPU for longer.
	// Loop until the CompareAndSwap operation succeeds (there may be more than)
	// since a single active indexer trying to down scale itself, or the active
	// indexer variable is in check.
	limit := a.activeLimit()
	for info.activeIndexers > limit {
		// Avoid having more than 1 concurrent downscale, by using a compare
		// and swap operation.
		if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
			a.config.Logger.Info(
				"active indexers exceeds limit, scaling down",
				zap.Int64("old_active_indexer_count", info.activeIndexers),
				zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
				zap.Int64("active_indexer_limit", limit),
			)
			return true
		}
		info = a.scalingInformation() // refresh scaling info if CAS failed.
	}
	if info.withinCoolDown(a.config.Scaling.ScaleDown.CoolDown, now) {
		return false
	}
	// If more than 1% of the requests result in 429, scale down the current
	// active indexer.
	if a.indexFailureRate() >= 0.01 {
		if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
			a.config.Logger.Info(
				"elasticsearch 429 response rate exceeded 1%, scaling down",
				zap.Int64("old_active_indexer_count", info.activeIndexers),
				zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
			)
			return true
		}
		return false
	}
	if *timedFlush < a.config.Scaling.ScaleDown.Threshold {
		return false
	}
	// Reset timedFlush after it has exceeded the threshold
	// it avoids unnecessary precociousness to scale down.
	*timedFlush = 0
	if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
		a.config.Logger.Info(
			"timed flush threshold exceeded, scaling down",
			zap.Int64("old_active_indexer_count", info.activeIndexers),
			zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
		)
		return true
	}
	return false
}