def _benchmark_graph()

in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]


  def _benchmark_graph(self, graph_info, eval_graph_info):
    """Benchmark the training graph.

    Args:
      graph_info: the namedtuple returned by _build_graph() which
        contains all necessary information to benchmark the graph, including
        named tensors/ops list, fetches, etc.
      eval_graph_info: Similar to graph_info but for the eval graph if
        --eval_during_training_* is used. Otherwise, None.
    Returns:
      Dictionary containing training statistics (num_workers, num_steps,
      average_wall_time, images_per_sec).
    """
    log_fn('Initializing graph')
    if self.params.variable_update == 'horovod':
      import horovod.tensorflow as hvd  # pylint: disable=g-import-not-at-top
      # First worker will be 'chief' - it will write summaries and
      # save checkpoints.
      is_chief = hvd.rank() == 0
    else:
      is_chief = (not self.job_name or self.task_index == 0)

    summary_writer = None
    if (is_chief and self.params.summary_verbosity and self.params.train_dir and
        self.params.save_summaries_steps > 0):
      summary_writer = tf.summary.FileWriter(self.params.train_dir,
                                             tf.get_default_graph())

    # We want to start the benchmark timer right after a image_producer barrier
    # and avoids undesired waiting times on barriers.
    if ((self.num_warmup_batches + len(graph_info.enqueue_ops) - 1) %
        self.batch_group_size) != 0:
      self.num_warmup_batches = int(
          math.ceil(
              (self.num_warmup_batches + len(graph_info.enqueue_ops) - 1.0) /
              (self.batch_group_size)) * self.batch_group_size -
          len(graph_info.enqueue_ops) + 1)
      log_fn('Round up warm up steps to %d to match batch_group_size' %
             self.num_warmup_batches)
      assert ((self.num_warmup_batches + len(graph_info.enqueue_ops) - 1) %
              self.batch_group_size) == 0
    # We run the summaries in the same thread as the training operations by
    # passing in None for summary_op to avoid a summary_thread being started.
    # Running summaries and training operations in parallel could run out of
    # GPU memory.
    if is_chief and not self.forward_only_and_freeze:
      saver = tf.train.Saver(
          self.variable_mgr.savable_variables(),
          save_relative_paths=True,
          max_to_keep=self.params.max_ckpts_to_keep)
    else:
      saver = None
    ready_for_local_init_op = None
    if self.job_name and not (self.single_session or
                              self.distributed_collective):
      # In distributed mode, we don't want to run local_var_init_op_group until
      # the global variables are initialized, because local_var_init_op_group
      # may use global variables (such as in distributed replicated mode). We
      # don't set this in non-distributed mode, because in non-distributed mode,
      # local_var_init_op_group may itself initialize global variables (such as
      # in replicated mode).
      ready_for_local_init_op = tf.report_uninitialized_variables(
          tf.global_variables())
    if self.params.variable_update == 'horovod':
      import horovod.tensorflow as hvd  # pylint: disable=g-import-not-at-top
      bcast_global_variables_op = hvd.broadcast_global_variables(0)
    else:
      bcast_global_variables_op = None

    if self.params.variable_update == 'collective_all_reduce':
      # It doesn't matter what this collective_graph_key value is,
      # so long as it's > 0 and the same at every worker.
      init_run_options = tf.RunOptions()
      init_run_options.experimental.collective_graph_key = 6
    else:
      init_run_options = tf.RunOptions()
    local_var_init_ops = [graph_info.local_var_init_op_group]
    if eval_graph_info:
      # `eval_graph_info.local_var_init_op_group` also includes some of the
      # training initializer ops, since it's difficult to filter them out.
      # Rerunning the training initializer ops is OK, but we add a control
      # dependency since running two sets of training initializer ops at the
      # same time can cause race conditions.
      with tf.control_dependencies(local_var_init_ops):
        local_var_init_ops.append(eval_graph_info.local_var_init_op_group)
    sv = tf.train.Supervisor(
        # For the purpose of Supervisor, all Horovod workers are 'chiefs',
        # since we want session to be initialized symmetrically on all the
        # workers.
        is_chief=is_chief or (self.params.variable_update == 'horovod'
                              or self.distributed_collective),
        # Log dir should be unset on non-chief workers to prevent Horovod
        # workers from corrupting each other's checkpoints.
        logdir=self.params.train_dir if is_chief else None,
        ready_for_local_init_op=ready_for_local_init_op,
        local_init_op=local_var_init_ops,
        saver=saver,
        global_step=graph_info.global_step,
        summary_op=None,
        save_model_secs=self.params.save_model_secs,
        summary_writer=summary_writer,
        local_init_run_options=init_run_options)

    profiler = tf.profiler.Profiler() if self.params.tfprof_file else None
    if self.graph_file is not None:
      path, filename = os.path.split(self.graph_file)
      as_text = filename.endswith('txt')
      log_fn('Writing GraphDef as %s to %s' % (  # pyformat break
          'text' if as_text else 'binary', self.graph_file))
      tf.train.write_graph(tf.get_default_graph().as_graph_def(add_shapes=True),
                           path, filename, as_text)

    start_standard_services = (
        self.params.train_dir or
        self.dataset.queue_runner_required())
    target = self.cluster_manager.get_target() if self.cluster_manager else ''
    with sv.managed_session(
        master=target,
        config=create_config_proto(self.params),
        start_standard_services=start_standard_services) as sess:
      # Anything that can potentially raise an OutOfRangeError with 'sess' MUST
      # be under this try block. The managed_session() context manager silently
      # ignores OutOfRangeError, so we must catch them and wrap them with
      # a different exception type so that they can be propagated up to the
      # caller.
      try:
        stats = self.benchmark_with_session(
            sess, sv, graph_info, eval_graph_info, bcast_global_variables_op,
            is_chief, summary_writer, profiler)
      except tf.errors.OutOfRangeError:
        raise RuntimeError(
            'Received OutOfRangeError. Wrapping in Runtime error to avoid '
            'Supervisor from suppressing the error. Original OutOfRangeError '
            'with traceback:\n' + traceback.format_exc())

    sv.stop()
    if profiler:
      generate_tfprof_profile(profiler, self.params.tfprof_file)
    return stats