in gossip/gossiper.py [0:0]
def mix(self, out_msg):
# out_msg must be on the correct device
assert out_msg.device.type == self.device.type
# bilat gossip can only send and receive from one peer at each itr
assert len(self.in_edges) == 1 and len(self.out_edges) == 1
out_edge, in_edge = self.out_edges[0], self.in_edges[0]
if self.logger is not None:
self.logger.debug('in/out -edges {}/{}'.format(in_edge, out_edge))
if not self.passive:
# prepare messages for gossip
mixed_out_msgs = self.mix_out_msg_(out_msg, 1., residual=True)
msg = next(mixed_out_msgs)
# blocking send/recv
dist.broadcast(tensor=msg, src=out_edge.src,
group=out_edge.process_group)
dist.broadcast(tensor=self.in_msg_buffer, src=in_edge.src,
group=in_edge.process_group)
completed = True
else:
if self._pending_req is None:
self._pending_req = dist.broadcast(
tensor=self.in_msg_buffer, src=in_edge.src,
group=in_edge.process_group, async_op=True)
if self._pending_req.is_completed():
# prepare messages for gossip
mixed_out_msgs = self.mix_out_msg_(out_msg, 1., residual=True)
msg = next(mixed_out_msgs)
if self.logger is not None:
self.logger.debug('req. completed; sending to {}'.format(out_edge))
dist.broadcast(tensor=msg, src=out_edge.src,
group=out_edge.process_group)
self._pending_req = None
completed = True
else:
completed = False
if completed:
self.refresh_peers_()
self.clean_msg_buffers_()
return self.parse_in_msg_buffer(residual=True)
return out_msg, completed