def start_replay()

in src/SimpleReplay/replay.py [0:0]


def start_replay(connection_logs, default_interface, odbc_driver, first_event_time, last_event_time,
                 num_workers, manager, per_process_stats, total_transactions, total_queries):

    """ create a queue for passing jobs to the workers.  the limit will cause
    put() to block if the queue is full """
    queue = manager.Queue(maxsize=1000000)

    if not num_workers:
        # get number of available cpus, leave 1 for main thread and manager
        num_workers = os.cpu_count()
        if num_workers > 0:
            num_workers = max(num_workers - 1, 4)
        else:
            num_workers = 4
            logger.warning(f"Couldn't determine the number of cpus, defaulting to {num_workers} processes.  Use the configuration parameter num_workers to change this.")

    replay_start_time = datetime.datetime.now(tz=datetime.timezone.utc)
    logger.info(f"Replay start time: {replay_start_time}")
    logger.debug(f"Running with {num_workers} workers")

    # find out how many processes we started with.  This is probably 1, due to the Manager
    initial_processes = len(multiprocessing.active_children())
    logger.debug(f"Initial child processes: {initial_processes}")

    global g_workers
    signal.signal(signal.SIGINT, signal.SIG_IGN)

    connection_semaphore = None
    num_connections = manager.Value(int, 0)
    peak_connections = manager.Value(int, 0)
    if g_config.get('limit_concurrent_connections'):
        # create an IPC semaphore to limit the total concurrency
        connection_semaphore = manager.Semaphore(g_config.get('limit_concurrent_connections'))

    for idx in range(num_workers):
        per_process_stats[idx] = manager.dict()
        init_stats(per_process_stats[idx])
        g_workers.append(multiprocessing.Process(target=replay_worker,
                                                 args=(idx, replay_start_time, first_event_time, queue,
                                                       per_process_stats[idx], default_interface, odbc_driver,
                                                       connection_semaphore, num_connections, peak_connections)))
        g_workers[-1].start()

    signal.signal(signal.SIGINT, sigint_handler)


    logger.debug(f"Total connections in the connection log: {len(connection_logs)}")

    # add all the jobs to the work queue
    for idx, connection in enumerate(connection_logs):
        if not put_and_retry({"job_id": idx, "connection": connection}, queue, non_workers=initial_processes):
            break

    # and add one termination "job"/signal for each worker so signal them to exit when
    # there is no more work
    for idx in range(num_workers):
        if not put_and_retry(False, queue, non_workers=initial_processes):
            break

    active_processes = len(multiprocessing.active_children()) - initial_processes
    logger.debug("Active processes: {}".format(active_processes))

    # and wait for the work to get done.
    logger.debug(f"{active_processes} processes running")
    cnt = 0

    while active_processes:
        cnt += 1
        active_processes = len(multiprocessing.active_children()) - initial_processes
        if cnt % 60 == 0:
            logger.debug(f"Waiting for {active_processes} processes to finish")
            try:
                queue_length = queue.qsize()
                logger.debug(f"Remaining connections: {queue_length-num_workers}")
            except NotImplementedError:
                # support for qsize is platform-dependent
                logger.debug("Queue length not supported.")

        # aggregate stats across all threads so far
        try:
            aggregated_stats = init_stats({})
            for idx, stat in per_process_stats.items():
                collect_stats(aggregated_stats, stat)
            if cnt % 5 == 0:
                display_stats(aggregated_stats, len(connection_logs), total_transactions, total_queries, peak_connections)
                peak_connections.value = num_connections.value
        except KeyError:
            logger.debug("No stats to display yet.")

        time.sleep(1)

    # cleanup in case of error
    remaining_events = 0
    try:
        # clear out the queue in case of error to prevent broken pipe
        # exceptions from internal Queue thread
        while not queue.empty():
            remaining_events += 1
            queue.get_nowait()
    except Empty:
        pass

    if remaining_events > 0:
        logger.error("Not all jobs processed, replay unsuccessful")

    return True