in scripts/tf_cnn_benchmarks/allreduce.py [0:0]
def sum_gradients_all_reduce(single_session,
dev_prefixes,
tower_grads,
num_workers,
alg,
num_shards,
gpu_indices,
agg_small_grads_max_bytes=0,
agg_small_grads_max_group=10,
allreduce_merge_scope=1):
"""Apply all-reduce algorithm over specified gradient tensors.
Args:
single_session: true if reduction is applied to one graph across
all workers, false if ths application is to a single-worker graph only.
dev_prefixes: list of prefix strings to use to generate PS device names.
tower_grads: the gradients to reduce.
num_workers: number of worker processes across entire job.
alg: the all-reduce algorithm to apply.
num_shards: alg-specific sharding factor.
gpu_indices: indices of local GPUs in order usable for ring-reduce.
agg_small_grads_max_bytes: largest tensor eligible for aggregation,
in number of bytes.
agg_small_grads_max_group: largest permitted aggregation of small
tensors.
allreduce_merge_scope: size of groups into which to partition consecutive
gradients grouped under a common 'allreduce' name scope for application
of ScopedAllocator optimization.
Returns:
list of reduced tensors
"""
alg_contains_shuffle = contains_any(alg, ['pscpu', 'psgpu'])
is_hierarchical = '/' in alg
if 'pscpu' in alg:
aux_devices = [prefix + '/cpu:0' for prefix in dev_prefixes]
elif 'psgpu' in alg:
aux_devices = [
prefix + '/gpu:%d' % i
for i in range(len(gpu_indices))
for prefix in dev_prefixes
]
else:
aux_devices = ['/job:localhost/cpu:0']
aux_device_groups = group_device_names(
aux_devices,
num_shards if (alg != 'collective' and alg_contains_shuffle) else 1)
group_index = 0
if agg_small_grads_max_bytes > 0 and agg_small_grads_max_group > 0:
tower_grads, packing = pack_small_tensors(
tower_grads,
max_bytes=agg_small_grads_max_bytes,
max_group=agg_small_grads_max_group)
else:
packing = None
reduced_gv_list = []
gv = list(zip(*tower_grads))
merge_scope = allreduce_merge_scope if allreduce_merge_scope > 0 else 1
chunked_gv = [gv[x:x + merge_scope]
for x in xrange(0, len(gv), merge_scope)]
for chunk in chunked_gv:
with tf.name_scope('allreduce'):
for grad_and_vars in chunk:
reduced_gv_list.append(sum_grad_and_var_all_reduce(
single_session,
grad_and_vars, num_workers, alg, gpu_indices,
(aux_devices if is_hierarchical
else aux_device_groups[group_index]),
num_shards))
group_index = (group_index + 1) % len(aux_device_groups)
new_tower_grads = [list(x) for x in zip(*reduced_gv_list)]
if packing:
new_tower_grads = unpack_small_tensors(new_tower_grads, packing)
return new_tower_grads