def execute_transaction()

in src/SimpleReplay/replay.py [0:0]


    def execute_transaction(self, transaction, connection):
        errors = []
        cursor = connection.cursor()

        for idx, query in enumerate(transaction.queries):
            time_until_start_ms = query.offset_ms(self.first_event_time) - current_offset_ms(self.replay_start)
            truncated_query = (query.text[:60] + '...' if len(query.text) > 60 else query.text).replace("\n", " ")
            logger.debug(f"Executing [{truncated_query}] in {time_until_start_ms/1000.0:.1f} sec")
            if time_until_start_ms > 10:
                time.sleep(time_until_start_ms / 1000.0)

            try:
                if (g_config["execute_copy_statements"] == "true" and "from 's3:" in query.text.lower()):
                    cursor.execute(query.text)
                elif (g_config["execute_unload_statements"] == "true" and "to 's3:" in query.text.lower() and g_config["replay_output"] is not None):
                    cursor.execute(query.text)
                elif ("from 's3:" not in query.text.lower()) and ("to 's3:" not in query.text.lower()) and ("$1" not in query.text):
                    cursor.execute(query.text)

                logger.debug(
                    f"Replayed DB={transaction.database_name}, USER={transaction.username}, PID={transaction.pid}, XID:{transaction.xid}, Query: {idx+1}/{len(transaction.queries)}"
                )
                self.thread_stats['query_success'] += 1
            except Exception as err:
                self.thread_stats['query_error'] += 1
                errors.append([query.text, str(err)])
                logger.debug(
                    f"Failed DB={transaction.database_name}, USER={transaction.username}, PID={transaction.pid}, XID:{transaction.xid}, Query: {idx+1}/{len(transaction.queries)}: {err}"
                )

            if query.time_interval > 0.0:
                logger.debug(f"Waiting {query.time_interval} sec between queries")
                time.sleep(query.time_interval)

        cursor.close()
        connection.commit()

        if self.thread_stats['query_error'] == 0:
            self.thread_stats['transaction_success'] += 1
        else:
            self.thread_stats['transaction_error'] += 1
            self.thread_stats['transaction_error_log'][transaction.get_base_filename()] = errors