in code/src/multiprocessing_event_loop.py [0:0]
def call_async(self, rank, action, result_type=None, fetch_all=False,
**kwargs):
"""Asynchronously call a function in each child process.
Call a function named `action` on the rank'th process and return
a Future with the result.
result_type can be used to indicate groups of results that are
equivalent, so that a gen() call will return the first available
result instead of waiting for the specific result that was enqueued.
This can be combined with fetch_all, which instead returns all available
results of a given type at each gen.
"""
if result_type is None:
# use a unique id
result_type = uuid.uuid4()
def simple_result_generator(expected_type, rank):
"""Handle the simple case where we want just one result."""
if len(self.return_buffer[expected_type]) > 0:
yield self.return_buffer[expected_type].pop(0)
return
while True:
result_type, result = self.return_pipes[rank].recv()
if result_type == expected_type:
yield result
return
self.return_buffer[result_type].append(result)
def fetch_all_result_generator(expected_type, rank):
"""Handle the more complicated case where we want all available
results of a given type."""
# We'll return any results that are already finished.
results = list(self.return_buffer[expected_type]) # use list() to copy
self.return_buffer[expected_type].clear()
# But also actively look for more results to return.
while True:
for rank in range(self.num_replicas):
# consume all available results from this replica
while self.return_pipes[rank].poll():
result_type, result = self.return_pipes[rank].recv()
if result_type == expected_type:
results.append(result)
else:
self.return_buffer[result_type].append(result)
if len(results) > 0:
yield results
return
self.input_pipes[rank].send((result_type, action, kwargs))
if fetch_all:
return Future(fetch_all_result_generator(result_type, rank))
else:
return Future(simple_result_generator(result_type, rank))