def call_async()

in code/src/multiprocessing_event_loop.py [0:0]


    def call_async(self, rank, action, result_type=None, fetch_all=False,
                   **kwargs):
        """Asynchronously call a function in each child process.

        Call a function named `action` on the rank'th process and return
        a Future with the result.

        result_type can be used to indicate groups of results that are
        equivalent, so that a gen() call will return the first available
        result instead of waiting for the specific result that was enqueued.
        This can be combined with fetch_all, which instead returns all available
        results of a given type at each gen.
        """

        if result_type is None:
            # use a unique id
            result_type = uuid.uuid4()

        def simple_result_generator(expected_type, rank):
            """Handle the simple case where we want just one result."""
            if len(self.return_buffer[expected_type]) > 0:
                yield self.return_buffer[expected_type].pop(0)
                return

            while True:
                result_type, result = self.return_pipes[rank].recv()
                if result_type == expected_type:
                    yield result
                    return
                self.return_buffer[result_type].append(result)

        def fetch_all_result_generator(expected_type, rank):
            """Handle the more complicated case where we want all available
            results of a given type."""
            # We'll return any results that are already finished.
            results = list(self.return_buffer[expected_type])  # use list() to copy
            self.return_buffer[expected_type].clear()

            # But also actively look for more results to return.
            while True:
                for rank in range(self.num_replicas):
                    # consume all available results from this replica
                    while self.return_pipes[rank].poll():
                        result_type, result = self.return_pipes[rank].recv()
                        if result_type == expected_type:
                            results.append(result)
                        else:
                            self.return_buffer[result_type].append(result)
                if len(results) > 0:
                    yield results
                    return

        self.input_pipes[rank].send((result_type, action, kwargs))

        if fetch_all:
            return Future(fetch_all_result_generator(result_type, rank))
        else:
            return Future(simple_result_generator(result_type, rank))