in src/SimpleReplay/replay.py [0:0]
def start_replay(connection_logs, default_interface, odbc_driver, first_event_time, last_event_time,
num_workers, manager, per_process_stats, total_transactions, total_queries):
""" create a queue for passing jobs to the workers. the limit will cause
put() to block if the queue is full """
queue = manager.Queue(maxsize=1000000)
if not num_workers:
# get number of available cpus, leave 1 for main thread and manager
num_workers = os.cpu_count()
if num_workers > 0:
num_workers = max(num_workers - 1, 4)
else:
num_workers = 4
logger.warning(f"Couldn't determine the number of cpus, defaulting to {num_workers} processes. Use the configuration parameter num_workers to change this.")
replay_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
logger.info(f"Replay start time: {replay_start_time}")
logger.debug(f"Running with {num_workers} workers")
# find out how many processes we started with. This is probably 1, due to the Manager
initial_processes = len(multiprocessing.active_children())
logger.debug(f"Initial child processes: {initial_processes}")
global g_workers
signal.signal(signal.SIGINT, signal.SIG_IGN)
connection_semaphore = None
num_connections = manager.Value(int, 0)
peak_connections = manager.Value(int, 0)
if g_config.get('limit_concurrent_connections'):
# create an IPC semaphore to limit the total concurrency
connection_semaphore = manager.Semaphore(g_config.get('limit_concurrent_connections'))
for idx in range(num_workers):
per_process_stats[idx] = manager.dict()
init_stats(per_process_stats[idx])
g_workers.append(multiprocessing.Process(target=replay_worker,
args=(idx, replay_start_time, first_event_time, queue,
per_process_stats[idx], default_interface, odbc_driver,
connection_semaphore, num_connections, peak_connections)))
g_workers[-1].start()
signal.signal(signal.SIGINT, sigint_handler)
logger.debug(f"Total connections in the connection log: {len(connection_logs)}")
# add all the jobs to the work queue
for idx, connection in enumerate(connection_logs):
if not put_and_retry({"job_id": idx, "connection": connection}, queue, non_workers=initial_processes):
break
# and add one termination "job"/signal for each worker so signal them to exit when
# there is no more work
for idx in range(num_workers):
if not put_and_retry(False, queue, non_workers=initial_processes):
break
active_processes = len(multiprocessing.active_children()) - initial_processes
logger.debug("Active processes: {}".format(active_processes))
# and wait for the work to get done.
logger.debug(f"{active_processes} processes running")
cnt = 0
while active_processes:
cnt += 1
active_processes = len(multiprocessing.active_children()) - initial_processes
if cnt % 60 == 0:
logger.debug(f"Waiting for {active_processes} processes to finish")
try:
queue_length = queue.qsize()
logger.debug(f"Remaining connections: {queue_length-num_workers}")
except NotImplementedError:
# support for qsize is platform-dependent
logger.debug("Queue length not supported.")
# aggregate stats across all threads so far
try:
aggregated_stats = init_stats({})
for idx, stat in per_process_stats.items():
collect_stats(aggregated_stats, stat)
if cnt % 5 == 0:
display_stats(aggregated_stats, len(connection_logs), total_transactions, total_queries, peak_connections)
peak_connections.value = num_connections.value
except KeyError:
logger.debug("No stats to display yet.")
time.sleep(1)
# cleanup in case of error
remaining_events = 0
try:
# clear out the queue in case of error to prevent broken pipe
# exceptions from internal Queue thread
while not queue.empty():
remaining_events += 1
queue.get_nowait()
except Empty:
pass
if remaining_events > 0:
logger.error("Not all jobs processed, replay unsuccessful")
return True