def sum_gradients_all_reduce()

in scripts/tf_cnn_benchmarks/allreduce.py [0:0]


def sum_gradients_all_reduce(single_session,
                             dev_prefixes,
                             tower_grads,
                             num_workers,
                             alg,
                             num_shards,
                             gpu_indices,
                             agg_small_grads_max_bytes=0,
                             agg_small_grads_max_group=10,
                             allreduce_merge_scope=1):
  """Apply all-reduce algorithm over specified gradient tensors.

  Args:
    single_session: true if reduction is applied to one graph across
      all workers, false if ths application is to a single-worker graph only.
    dev_prefixes: list of prefix strings to use to generate PS device names.
    tower_grads: the gradients to reduce.
    num_workers: number of worker processes across entire job.
    alg: the all-reduce algorithm to apply.
    num_shards: alg-specific sharding factor.
    gpu_indices: indices of local GPUs in order usable for ring-reduce.
    agg_small_grads_max_bytes: largest tensor eligible for aggregation,
      in number of bytes.
    agg_small_grads_max_group: largest permitted aggregation of small
      tensors.
    allreduce_merge_scope: size of groups into which to partition consecutive
      gradients grouped under a common 'allreduce' name scope for application
      of ScopedAllocator optimization.

  Returns:
    list of reduced tensors
  """
  alg_contains_shuffle = contains_any(alg, ['pscpu', 'psgpu'])
  is_hierarchical = '/' in alg
  if 'pscpu' in alg:
    aux_devices = [prefix + '/cpu:0' for prefix in dev_prefixes]
  elif 'psgpu' in alg:
    aux_devices = [
        prefix + '/gpu:%d' % i
        for i in range(len(gpu_indices))
        for prefix in dev_prefixes
    ]
  else:
    aux_devices = ['/job:localhost/cpu:0']
  aux_device_groups = group_device_names(
      aux_devices,
      num_shards if (alg != 'collective' and alg_contains_shuffle) else 1)
  group_index = 0
  if agg_small_grads_max_bytes > 0 and agg_small_grads_max_group > 0:
    tower_grads, packing = pack_small_tensors(
        tower_grads,
        max_bytes=agg_small_grads_max_bytes,
        max_group=agg_small_grads_max_group)
  else:
    packing = None
  reduced_gv_list = []
  gv = list(zip(*tower_grads))
  merge_scope = allreduce_merge_scope if allreduce_merge_scope > 0 else 1
  chunked_gv = [gv[x:x + merge_scope]
                for x in xrange(0, len(gv), merge_scope)]
  for chunk in chunked_gv:
    with tf.name_scope('allreduce'):
      for grad_and_vars in chunk:
        reduced_gv_list.append(sum_grad_and_var_all_reduce(
            single_session,
            grad_and_vars, num_workers, alg, gpu_indices,
            (aux_devices if is_hierarchical
             else aux_device_groups[group_index]),
            num_shards))
        group_index = (group_index + 1) % len(aux_device_groups)
  new_tower_grads = [list(x) for x in zip(*reduced_gv_list)]
  if packing:
    new_tower_grads = unpack_small_tensors(new_tower_grads, packing)
  return new_tower_grads