in core/lib/payload/copy.py [0:0]
def replay_till_good2go(self, checksum):
"""
Keep replaying changes until the time spent in replay is below
self.replay_timeout
For table which has huge numbers of writes during OSC, we'll probably
hit replay timeout if we call swap_tables directly after checksum.
We will do several round iteration here in order to bring the number
of un-played changes down to a proper level a proper level
@param checksum: Run checksum for replayed changes or not
@type checksum: bool
"""
log.info(
"Replay at most {} more round(s) until we can finish in {} "
"seconds".format(self.replay_max_attempt, self.replay_timeout)
)
# Temporarily enable slow query log for slow replay statements
self.execute_sql(sql.set_session_variable("long_query_time"), (1,))
for i in range(self.replay_max_attempt):
log.info("Catchup Attempt: {}".format(i + 1))
start_time = time.time()
# If checksum is required, then we need to make sure total time
# spent in replay+checksum is below replay_timeout.
if checksum and self.need_checksum():
self.start_transaction()
log.info(
"Catch up in order to compare checksum for the "
"rows that have been changed"
)
self.replay_changes(single_trx=True)
self.checksum_for_changes(single_trx=False)
else:
# Break replay into smaller chunks if it's too big
max_id_now = self.get_max_delta_id()
while max_id_now - self.last_replayed_id > self.max_replay_batch_size:
delta_id_limit = self.last_replayed_id + self.max_replay_batch_size
log.info("Replay up to {}".format(delta_id_limit))
self.replay_changes(single_trx=False, delta_id_limit=delta_id_limit)
self.replay_changes(single_trx=False, delta_id_limit=max_id_now)
time_in_replay = time.time() - start_time
if time_in_replay < self.replay_timeout:
log.info(
"Time spent in last round of replay is {:.2f}, which "
"is less than replay_timeout: {} for final replay. "
"We are good to proceed".format(time_in_replay, self.replay_timeout)
)
break
else:
# We are not able to bring the replay time down to replay_timeout
if not self.bypass_replay_timeout:
raise OSCError("MAX_ATTEMPT_EXCEEDED", {"timeout": self.replay_timeout})
else:
log.warning(
"Proceed after max replay attempts exceeded. "
"Because --bypass-replay-timeout is specified"
)