def replay_worker()

in src/SimpleReplay/replay.py [0:0]


def replay_worker(process_idx, replay_start_time, first_event_time, queue, worker_stats, default_interface, odbc_driver, connection_semaphore,
                  num_connections, peak_connections):
    """ Worker process to distribute the work among several processes.  Each
        worker pulls a connection off the queue, waits until its time to start
        it, spawns a thread to execute the actual connection and associated
        transactions, and then repeats. """

    # map thread to stats dict
    connection_threads = {}
    connections_processed = 0

    threading.current_thread().name = '0'

    try:
        # prepend the process index to all log messages in this worker
        prepend_ids_to_logs(process_idx)

        # stagger worker startup to not hammer the get_cluster_credentials api
        time.sleep(random.randrange(1, 3))
        logger.debug(f"Worker {process_idx} ready for jobs")

        # time to block waiting for jobs on the queue
        timeout_sec = 10

        last_empty_queue_time = None

        # get the next job off the queue and wait until its due
        # loop terminates when a False is received over the queue
        while True:
            try:
                if connection_semaphore is not None:
                    logger.debug(f"Checking for connection throttling ({num_connections.value} / {g_config['limit_concurrent_connections']} active connections)")
                    sem_start = time.time()
                    connection_semaphore.acquire()
                    sem_elapsed = time.time() - sem_start
                    logger.debug(f"Waited {sem_elapsed} sec for semaphore")

                job = queue.get(timeout=timeout_sec)
            except Empty:
                if connection_semaphore is not None:
                    connection_semaphore.release()

                elapsed = int(time.time() - last_empty_queue_time) if last_empty_queue_time else 0
                # take into account the initial timeout
                elapsed += timeout_sec
                empty_queue_timeout_sec = g_config.get("empty_queue_timeout_sec", 120)
                logger.debug(f"No jobs for {elapsed} seconds (timeout {empty_queue_timeout_sec})")
                # normally processes exit when they get a False on the queue,
                # but in case of some error we exit if the queue is empty for some time
                if elapsed > empty_queue_timeout_sec:
                    logger.warning(f"Queue empty for {elapsed} sec, exiting")
                    break
                if last_empty_queue_time is None:
                    last_empty_queue_time = time.time()
                continue

            last_empty_queue_time = None

            if job is False:
                logger.debug("Got termination signal, finishing up.")
                break

            thread_stats = init_stats({})

            # how much time has elapsed since the replay started
            time_elapsed_ms = current_offset_ms(replay_start_time)

            # what is the time offset of this connection job relative to the first event
            connection_offset_ms = job['connection'].offset_ms(first_event_time)
            delay_sec = (connection_offset_ms - time_elapsed_ms) / 1000.0

            logger.debug(f"Got job {job['job_id']+1}, delay {delay_sec:+.3f} sec (extracted connection time: {job['connection'].session_initiation_time})")

            # if connection time is more than a few ms in the future, sleep until its due.
            # this is approximate and we use "a few ms" here due to the imprecision of
            # sleep as well as the time for the remaining code to spawn a thread and actually
            # make the db connection.
            if connection_offset_ms - time_elapsed_ms > 10:
                time.sleep(delay_sec)

            logger.debug(f"Starting job {job['job_id']+1} (extracted connection time: {job['connection'].session_initiation_time}). {len(threading.enumerate())}, {threading.active_count()} connections active.")

            connection_thread = ConnectionThread(
                process_idx,
                job['job_id'],
                job['connection'],
                default_interface,
                odbc_driver,
                replay_start_time,
                first_event_time,
                thread_stats,
                num_connections,
                peak_connections,
                connection_semaphore
            )
            connection_thread.name = f"{job['job_id']}"
            connection_thread.start()
            connection_threads[connection_thread] = thread_stats

            join_finished_threads(connection_threads, worker_stats, wait=False)

            connections_processed += 1

        logger.debug(f"Waiting for {len(connection_threads)} connections to finish...")
        join_finished_threads(connection_threads, worker_stats, wait=True)
    except Exception as e:
        logger.error(f"Process {process_idx} threw exception: {e}")
        logger.debug("".join(traceback.format_exception(*sys.exc_info())))

    if connections_processed:
        logger.debug(f"Max connection offset for this process: {worker_stats['connection_diff_sec']:.3f} sec")

    logger.debug(f"Process {process_idx} finished")