in scripts/tf_cnn_benchmarks/allreduce.py [0:0]
def sum_grad_and_var_all_reduce(single_session,
grad_and_vars,
num_workers,
alg,
gpu_indices,
aux_devices=None,
num_shards=1):
"""Apply all-reduce algorithm over specified gradient tensors."""
scaled_grads = [g for g, _ in grad_and_vars]
if alg == 'collective':
assert not single_session
summed_grads = build_collective_reduce(
scaled_grads, num_workers, num_shards, 'Add', 'Id')
else:
with tf.name_scope('allreduce'):
# Note that each grad_and_vars looks like the following:
# ((grad0_gpu0, var0_gpu0), ... , (grad0_gpuN, var0_gpuN))
if alg == 'nccl':
summed_grads = all_reduce.build_nccl_all_reduce(scaled_grads, tf.add)
elif alg == 'xring':
summed_grads = all_reduce.build_ring_all_reduce(
scaled_grads, num_workers, num_shards, gpu_indices, tf.add)
elif alg == 'nccl/xring':
summed_grads = all_reduce.build_nccl_then_ring(scaled_grads, num_shards,
tf.add)
elif alg == 'nccl/rechd':
summed_grads = all_reduce.build_nccl_then_recursive_hd(
scaled_grads, tf.add)
elif alg == 'nccl/pscpu':
summed_grads = all_reduce.build_nccl_then_shuffle(
scaled_grads, aux_devices, tf.add, tf.add_n)
elif alg == 'pscpu/pscpu':
summed_grads = all_reduce.build_shuffle_then_shuffle(
scaled_grads,
aux_devices,
# TODO(tucker): devise a way of better specifying the device set
# for the second level.
[aux_devices[0]],
tf.add_n)
elif alg in ['pscpu', 'psgpu']:
summed_grads = all_reduce.build_shuffle_all_reduce(
scaled_grads, aux_devices, tf.add_n)
else:
raise ValueError('unsupported all_reduce alg: ', alg)
result = []
for (_, v), g in zip(grad_and_vars, summed_grads):
result.append([g, v])
return result