in horovod/runner/util/threads.py [0:0]
def execute_function_multithreaded(fn,
args_list,
block_until_all_done=True,
max_concurrent_executions=1000):
"""
Executes fn in multiple threads each with one set of the args in the
args_list.
:param fn: function to be executed
:type fn:
:param args_list:
:type args_list: list(list)
:param block_until_all_done: if is True, function will block until all the
threads are done and will return the results of each thread's execution.
:type block_until_all_done: bool
:param max_concurrent_executions:
:type max_concurrent_executions: int
:return:
If block_until_all_done is False, returns None. If block_until_all_done is
True, function returns the dict of results.
{
index: execution result of fn with args_list[index]
}
:rtype: dict
"""
result_queue = queue.Queue()
worker_queue = queue.Queue()
for i, arg in enumerate(args_list):
arg.append(i)
worker_queue.put(arg)
def fn_execute():
while True:
try:
arg = worker_queue.get(block=False)
except queue.Empty:
return
exec_index = arg[-1]
res = fn(*arg[:-1])
result_queue.put((exec_index, res))
threads = []
number_of_threads = min(max_concurrent_executions, len(args_list))
for _ in range(number_of_threads):
thread = in_thread(target=fn_execute, daemon=not block_until_all_done)
threads.append(thread)
# Returns the results only if block_until_all_done is set.
results = None
if block_until_all_done:
# Because join() cannot be interrupted by signal, a single join()
# needs to be separated into join()s with timeout in a while loop.
have_alive_child = True
while have_alive_child:
have_alive_child = False
for t in threads:
t.join(0.1)
if t.is_alive():
have_alive_child = True
results = {}
while not result_queue.empty():
item = result_queue.get()
results[item[0]] = item[1]
if len(results) != len(args_list):
raise RuntimeError(
'Some threads for func {func} did not complete '
'successfully.'.format(func=fn.__name__))
return results