in python/singa/opt.py [0:0]
def backward_and_sparse_update(self,
loss,
threshold=2097152,
spars=0.05,
topK=False,
corr=True):
""" Performs backward propagation from the loss and parameter update with sparsification.
THIS IS A EXPERIMENTAL FUNCTION FOR RESEARCH PURPOSE:
From the loss, it performs backward propagation to get the gradients and do the parameter
update. It fuses the tensors with size smaller than the threshold value to reduce network
latency, as well as using sparsification schemes to transfer only the gradient elements which
are significant.
Args:
loss(Tensor): loss is the objective function of the deep learning model
optimization, e.g. for classification problem it can be the output of the
softmax_cross_entropy function.
threshold(int): threshold is a parameter to control performance in fusing
the tensors. For the tensors of sizes smaller than threshold, they are to
be accumulated and fused before the all reduce operation. For the tensors
of its size larger than the threshold value, they are to be reduced directly
without fusion.
spars(float): a parameter to control sparsity as defined below
topK(bool): When topK is False, it sparsifies the gradient with absolute
value >= sparsWhen topK is True, it sparsifies a fraction of total gradient
number equals to spars, E.g. when spars = 0.01, it sparsifies 1 % of the
total gradient elements
corr(bool): whether to use the local accumulate gradient for correction
Attributes:
self.sparsInit: A counter to determine which partition to perform all-reduce.
self.gradAccumulation: Local gradient accumulation
"""
if ((not hasattr(self, "sparsInit")) and corr):
self.gradAccumulation = []
self.sparsInit = False
plist = []
acc = 0
k = -1
glist = []
for p, g in autograd.backward(loss):
if g.size() > threshold:
# larger than threshold -> reduced directly
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((g.size(),), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.sparsification(g.data, self.gradAccumulation[k].data,
spars, topK)
else:
self.sparsification(g.data, None, spars, topK)
else:
# smaller than threshold -> accumulate
glist.append(g.data)
acc += g.size()
if (acc > threshold):
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((acc,), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.fused_sparsification(glist,
self.gradAccumulation[k].data,
spars, topK)
else:
self.fused_sparsification(glist, None, spars, topK)
acc = 0
glist = []
plist.append((p, g))
if glist:
k += 1
if (corr and (not self.sparsInit)):
# create a tensor for the gradient accumulation
flag = p.device.graph_enabled()
p.device.EnableGraph(False)
self.gradAccumulation.append(
tensor.Tensor((acc,), p.device, p.dtype))
self.gradAccumulation[k].set_value(0.0)
p.device.EnableGraph(flag)
if corr:
self.fused_sparsification(glist, self.gradAccumulation[k].data,
spars, topK)
else:
self.fused_sparsification(glist, None, spars, topK)
self.wait()
for p, g in plist:
self.update(p, g)
self.sparsInit = True
self.opt.step()