in elastic/shared/schedulers/indexing.py [0:0]
def throttle(self, weight):
self.logger.debug("Throttling...")
self.logger.debug(
"Current Divisor: [%s] Last State: [%s] New State: [%s]",
self.divisor,
self.last_state,
-self.weight,
)
self.divisor = (self.ALPHA * self.divisor) + (self.BETA * abs(self.last_state + self.weight))
self.logger.debug("Divisor is [%s]", self.divisor)
if self.rate < self.max_delay:
rate_increment = self.max_delay / (1 + self.divisor)
new_rate = round(self.rate + rate_increment, 6)
# increase weight first
new_rate = new_rate if new_rate < self.max_delay else self.max_delay
self.logger.debug("Adjusting rate from [%s]s to [%s]s", self.rate, new_rate)
self.rate = new_rate
else:
# can't slow down using the rate anymore, so we start to decrease the batch size
bulk_size_increment = self.max_bulk_size / (1 + self.divisor)
new_bulk_size = weight - bulk_size_increment
new_bulk_size = int(new_bulk_size if new_bulk_size > 1 else 1)
self.logger.debug("Adjusting bulk size from [%s] to [%s]", weight, new_bulk_size)
self.parameter_source.set_bulk_size(new_bulk_size)
self.last_state = self.weight * -1