in elastic/shared/schedulers/indexing.py [0:0]
def speedup(self, weight):
self.logger.debug("Speeding up...")
self.logger.debug(
"Current Divisor: [%s] Last State: [%s] New State: [%s]",
self.divisor,
self.last_state,
self.weight,
)
self.divisor = (self.ALPHA * self.divisor) + (self.BETA * abs(self.last_state - self.weight))
self.logger.debug("Divisor is [%s]", self.divisor)
if weight < self.max_bulk_size:
# increase the bulk to the max first
bulk_size_increment = self.max_bulk_size / (1 + self.divisor)
new_bulk_size = weight + bulk_size_increment
new_bulk_size = int(new_bulk_size if new_bulk_size < self.max_bulk_size else self.max_bulk_size)
self.logger.debug("Adjusting bulk size from [%s] to [%s]", weight, new_bulk_size)
self.parameter_source.set_bulk_size(new_bulk_size)
else:
# can't use the bulk size to speed up any more so we use the rate
rate_increment = self.max_delay / (1 + self.divisor)
new_rate = round(self.rate - rate_increment, 6)
new_rate = new_rate if new_rate > 0 else 0
self.logger.debug("Adjusting rate from [%s]s to [%s]s", self.rate, new_rate)
self.rate = new_rate
self.last_state = self.weight