in appender.go [616:673]
func (a *Appender) maybeScaleDown(now time.Time, info scalingInfo, timedFlush *uint) bool {
// Only downscale when there is more than 1 active indexer.
if info.activeIndexers == 1 {
return false
}
// If the CPU quota changes and there is more than 1 indexer, downscale an
// active indexer. This downscaling action isn't subject to the downscaling
// cooldown, since doing so would result in using much more CPU for longer.
// Loop until the CompareAndSwap operation succeeds (there may be more than)
// since a single active indexer trying to down scale itself, or the active
// indexer variable is in check.
limit := a.activeLimit()
for info.activeIndexers > limit {
// Avoid having more than 1 concurrent downscale, by using a compare
// and swap operation.
if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
a.config.Logger.Info(
"active indexers exceeds limit, scaling down",
zap.Int64("old_active_indexer_count", info.activeIndexers),
zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
zap.Int64("active_indexer_limit", limit),
)
return true
}
info = a.scalingInformation() // refresh scaling info if CAS failed.
}
if info.withinCoolDown(a.config.Scaling.ScaleDown.CoolDown, now) {
return false
}
// If more than 1% of the requests result in 429, scale down the current
// active indexer.
if a.indexFailureRate() >= 0.01 {
if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
a.config.Logger.Info(
"elasticsearch 429 response rate exceeded 1%, scaling down",
zap.Int64("old_active_indexer_count", info.activeIndexers),
zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
)
return true
}
return false
}
if *timedFlush < a.config.Scaling.ScaleDown.Threshold {
return false
}
// Reset timedFlush after it has exceeded the threshold
// it avoids unnecessary precociousness to scale down.
*timedFlush = 0
if newInfo := info.ScaleDown(now); a.scalingInfo.CompareAndSwap(info, newInfo) {
a.config.Logger.Info(
"timed flush threshold exceeded, scaling down",
zap.Int64("old_active_indexer_count", info.activeIndexers),
zap.Int64("new_active_indexer_count", newInfo.activeIndexers),
)
return true
}
return false
}