in src/SimpleReplay/replay.py [0:0]
def execute_transaction(self, transaction, connection):
errors = []
cursor = connection.cursor()
for idx, query in enumerate(transaction.queries):
time_until_start_ms = query.offset_ms(self.first_event_time) - current_offset_ms(self.replay_start)
truncated_query = (query.text[:60] + '...' if len(query.text) > 60 else query.text).replace("\n", " ")
logger.debug(f"Executing [{truncated_query}] in {time_until_start_ms/1000.0:.1f} sec")
if time_until_start_ms > 10:
time.sleep(time_until_start_ms / 1000.0)
try:
if (g_config["execute_copy_statements"] == "true" and "from 's3:" in query.text.lower()):
cursor.execute(query.text)
elif (g_config["execute_unload_statements"] == "true" and "to 's3:" in query.text.lower() and g_config["replay_output"] is not None):
cursor.execute(query.text)
elif ("from 's3:" not in query.text.lower()) and ("to 's3:" not in query.text.lower()) and ("$1" not in query.text):
cursor.execute(query.text)
logger.debug(
f"Replayed DB={transaction.database_name}, USER={transaction.username}, PID={transaction.pid}, XID:{transaction.xid}, Query: {idx+1}/{len(transaction.queries)}"
)
self.thread_stats['query_success'] += 1
except Exception as err:
self.thread_stats['query_error'] += 1
errors.append([query.text, str(err)])
logger.debug(
f"Failed DB={transaction.database_name}, USER={transaction.username}, PID={transaction.pid}, XID:{transaction.xid}, Query: {idx+1}/{len(transaction.queries)}: {err}"
)
if query.time_interval > 0.0:
logger.debug(f"Waiting {query.time_interval} sec between queries")
time.sleep(query.time_interval)
cursor.close()
connection.commit()
if self.thread_stats['query_error'] == 0:
self.thread_stats['transaction_success'] += 1
else:
self.thread_stats['transaction_error'] += 1
self.thread_stats['transaction_error_log'][transaction.get_base_filename()] = errors