in src/SimpleReplay/replay.py [0:0]
def replay_worker(process_idx, replay_start_time, first_event_time, queue, worker_stats, default_interface, odbc_driver, connection_semaphore,
num_connections, peak_connections):
""" Worker process to distribute the work among several processes. Each
worker pulls a connection off the queue, waits until its time to start
it, spawns a thread to execute the actual connection and associated
transactions, and then repeats. """
# map thread to stats dict
connection_threads = {}
connections_processed = 0
threading.current_thread().name = '0'
try:
# prepend the process index to all log messages in this worker
prepend_ids_to_logs(process_idx)
# stagger worker startup to not hammer the get_cluster_credentials api
time.sleep(random.randrange(1, 3))
logger.debug(f"Worker {process_idx} ready for jobs")
# time to block waiting for jobs on the queue
timeout_sec = 10
last_empty_queue_time = None
# get the next job off the queue and wait until its due
# loop terminates when a False is received over the queue
while True:
try:
if connection_semaphore is not None:
logger.debug(f"Checking for connection throttling ({num_connections.value} / {g_config['limit_concurrent_connections']} active connections)")
sem_start = time.time()
connection_semaphore.acquire()
sem_elapsed = time.time() - sem_start
logger.debug(f"Waited {sem_elapsed} sec for semaphore")
job = queue.get(timeout=timeout_sec)
except Empty:
if connection_semaphore is not None:
connection_semaphore.release()
elapsed = int(time.time() - last_empty_queue_time) if last_empty_queue_time else 0
# take into account the initial timeout
elapsed += timeout_sec
empty_queue_timeout_sec = g_config.get("empty_queue_timeout_sec", 120)
logger.debug(f"No jobs for {elapsed} seconds (timeout {empty_queue_timeout_sec})")
# normally processes exit when they get a False on the queue,
# but in case of some error we exit if the queue is empty for some time
if elapsed > empty_queue_timeout_sec:
logger.warning(f"Queue empty for {elapsed} sec, exiting")
break
if last_empty_queue_time is None:
last_empty_queue_time = time.time()
continue
last_empty_queue_time = None
if job is False:
logger.debug("Got termination signal, finishing up.")
break
thread_stats = init_stats({})
# how much time has elapsed since the replay started
time_elapsed_ms = current_offset_ms(replay_start_time)
# what is the time offset of this connection job relative to the first event
connection_offset_ms = job['connection'].offset_ms(first_event_time)
delay_sec = (connection_offset_ms - time_elapsed_ms) / 1000.0
logger.debug(f"Got job {job['job_id']+1}, delay {delay_sec:+.3f} sec (extracted connection time: {job['connection'].session_initiation_time})")
# if connection time is more than a few ms in the future, sleep until its due.
# this is approximate and we use "a few ms" here due to the imprecision of
# sleep as well as the time for the remaining code to spawn a thread and actually
# make the db connection.
if connection_offset_ms - time_elapsed_ms > 10:
time.sleep(delay_sec)
logger.debug(f"Starting job {job['job_id']+1} (extracted connection time: {job['connection'].session_initiation_time}). {len(threading.enumerate())}, {threading.active_count()} connections active.")
connection_thread = ConnectionThread(
process_idx,
job['job_id'],
job['connection'],
default_interface,
odbc_driver,
replay_start_time,
first_event_time,
thread_stats,
num_connections,
peak_connections,
connection_semaphore
)
connection_thread.name = f"{job['job_id']}"
connection_thread.start()
connection_threads[connection_thread] = thread_stats
join_finished_threads(connection_threads, worker_stats, wait=False)
connections_processed += 1
logger.debug(f"Waiting for {len(connection_threads)} connections to finish...")
join_finished_threads(connection_threads, worker_stats, wait=True)
except Exception as e:
logger.error(f"Process {process_idx} threw exception: {e}")
logger.debug("".join(traceback.format_exception(*sys.exc_info())))
if connections_processed:
logger.debug(f"Max connection offset for this process: {worker_stats['connection_diff_sec']:.3f} sec")
logger.debug(f"Process {process_idx} finished")