in gossip/distributed.py [0:0]
def __make_backward_hook(self):
self.logger.debug('making backward hook')
def hook(*unused):
# reduce gradients across devices on a single machine
if len(self.device_ids) > 1:
# collect gradients from all copies
all_grads = [[] for _ in range(len(self._module_copies))]
for dev_idx, module in enumerate(self._module_copies):
for p in module.parameters():
if not p.requires_grad or p.grad is None:
continue
all_grads[dev_idx].append(p.grad.data)
# reduce grads
reduced_grads = reduce_add_coalesced(
all_grads, self.output_device,
self.nccl_reduce_bucket_size)
# update grads with reduced grads
for grad, reduced in zip(all_grads[0], reduced_grads):
grad.copy_(reduced)
# clear the gradients and parameters across all replicas
for module in self._module_copies[1:]:
for param in module.parameters():
if param.requires_grad:
param.grad = None
param.data.set_()
if self.nprocs_per_node > 1:
grads = []
for p in self.module.parameters():
if not p.requires_grad or p.grad is None:
continue
p.grad.data.div_(self.nprocs_per_node_device.type(
p.grad.data.dtype))
grads.append(p.grad.data)
communication_op = functools.partial(
dist.all_reduce, group=self.local_node_group)
communicate(grads, communication_op)
# convert model back to ps-numerator
self.ps_numerator()
def queue_hook(*unused):
Variable._execution_engine.queue_callback(hook)
return queue_hook