def replay_till_good2go()

in core/lib/payload/copy.py [0:0]


    def replay_till_good2go(self, checksum):
        """
        Keep replaying changes until the time spent in replay is below
        self.replay_timeout
        For table which has huge numbers of writes during OSC, we'll probably
        hit replay timeout if we call swap_tables directly after checksum.
        We will do several round iteration here in order to bring the number
        of un-played changes down to a proper level a proper level

        @param checksum:  Run checksum for replayed changes or not
        @type  checksum:  bool

        """
        log.info(
            "Replay at most {} more round(s) until we can finish in {} "
            "seconds".format(self.replay_max_attempt, self.replay_timeout)
        )
        # Temporarily enable slow query log for slow replay statements
        self.execute_sql(sql.set_session_variable("long_query_time"), (1,))
        for i in range(self.replay_max_attempt):
            log.info("Catchup Attempt: {}".format(i + 1))
            start_time = time.time()
            # If checksum is required, then we need to make sure total time
            # spent in replay+checksum is below replay_timeout.
            if checksum and self.need_checksum():
                self.start_transaction()
                log.info(
                    "Catch up in order to compare checksum for the "
                    "rows that have been changed"
                )
                self.replay_changes(single_trx=True)
                self.checksum_for_changes(single_trx=False)
            else:
                # Break replay into smaller chunks if it's too big
                max_id_now = self.get_max_delta_id()
                while max_id_now - self.last_replayed_id > self.max_replay_batch_size:
                    delta_id_limit = self.last_replayed_id + self.max_replay_batch_size
                    log.info("Replay up to {}".format(delta_id_limit))
                    self.replay_changes(single_trx=False, delta_id_limit=delta_id_limit)
                self.replay_changes(single_trx=False, delta_id_limit=max_id_now)

            time_in_replay = time.time() - start_time
            if time_in_replay < self.replay_timeout:
                log.info(
                    "Time spent in last round of replay is {:.2f}, which "
                    "is less than replay_timeout: {} for final replay. "
                    "We are good to proceed".format(time_in_replay, self.replay_timeout)
                )
                break
        else:
            # We are not able to bring the replay time down to replay_timeout
            if not self.bypass_replay_timeout:
                raise OSCError("MAX_ATTEMPT_EXCEEDED", {"timeout": self.replay_timeout})
            else:
                log.warning(
                    "Proceed after max replay attempts exceeded. "
                    "Because --bypass-replay-timeout is specified"
                )