in src/local_gpu_verifier/src/verifier/utils/__init__.py [0:0]
def function_wrapper_with_timeout(args, max_time_delay):
""" This function spawns a separate thread for the given function in the
arguments to be executed in that separate thread.
Args:
args (list): the list containing the function and its arguments.
Raises:
TimeoutError: it is raised if the thread spawned takes more time than
the threshold time limit.
Returns:
[any]: the return of the function being executed in the thread.
"""
assert type(args) is list
try:
function_name = args[-1]
q = queue.Queue()
args.append(q)
event = Event()
args.append(event)
args = ((args),)
event_log.info(f"{function_name} called.")
thread = Thread(target = function_caller, args = args)
thread.start()
return_value = q.get(block=True, timeout= max_time_delay)
event.set()
return return_value
except Empty:
event_log.error(f"The {function_name} call timed out.")
raise TimeoutError(f"The {function_name} call timed out.")