def benchmark_with_session()

in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]


  def benchmark_with_session(self, sess, supervisor, graph_info,
                             eval_graph_info, bcast_global_variables_op,
                             is_chief, summary_writer, profiler):
    """Benchmarks the graph with the given session.

    Args:
      sess: The session to benchmark the graph with
      supervisor: The Supervisor that created the session.
      graph_info: the namedtuple returned by _build_graph() which
        contains all necessary information to benchmark the graph, including
        named tensors/ops list, fetches, etc.
      eval_graph_info: Similar to graph_info but for the eval graph if
        --eval_during_training_every_n_steps is used. Otherwise, None.
      bcast_global_variables_op: If Horovod is used, the op to broadcast the
        global variables to all the processes. None if Horovod is not used.
      is_chief: True if this is the chief process.
      summary_writer: The SummaryWriter used to write summaries, or None if
        summaries are not used.
      profiler: The tf.profiler.Profiler, or None if tfprof is not used.

    Returns:
      Dictionary containing training statistics (num_workers, num_steps,
      average_wall_time, images_per_sec).
    """
    if self.params.backbone_model_path is not None:
      self.model.load_backbone_model(sess, self.params.backbone_model_path)
    if bcast_global_variables_op:
      sess.run(bcast_global_variables_op)
    image_producer = None
    if graph_info.input_producer_op is not None:
      image_producer = cnn_util.ImageProducer(
          sess, graph_info.input_producer_op, self.batch_group_size,
          self.params.use_python32_barrier)
      image_producer.start()
    if graph_info.enqueue_ops:
      for i in xrange(len(graph_info.enqueue_ops)):
        sess.run(graph_info.enqueue_ops[:(i + 1)])
        if image_producer is not None:
          image_producer.notify_image_consumption()
    self.init_global_step, = sess.run([graph_info.global_step])
    if self.job_name and not self.params.cross_replica_sync:
      # TODO(zhengxq): Do we need to use a global step watcher at all?
      global_step_watcher = GlobalStepWatcher(
          sess, graph_info.global_step,
          self.num_workers * self.num_warmup_batches +
          self.init_global_step,
          self.num_workers * (self.num_warmup_batches + self.num_batches) - 1)
      global_step_watcher.start()
    else:
      global_step_watcher = None
    eval_image_producer = None
    if eval_graph_info:
      # We pass local_var_init_op_group=None because the Supervisor already
      # initialized local variables above. We need to have the Supervisor
      # initialize the local variables, because otherwise it throws an error
      # complaining that not all variables were initialized.
      eval_image_producer = self._initialize_eval_graph(
          eval_graph_info.enqueue_ops, eval_graph_info.input_producer_op,
          local_var_init_op_group=None, sess=sess)
    step_train_times = []
    log_fn('Running warm up')
    local_step = -1 * self.num_warmup_batches
    if self.single_session:
      # In single session mode, each step, the global_step is incremented by
      # 1. In non-single session mode, each step, the global_step is
      # incremented once per worker. This means we need to divide
      # init_global_step by num_workers only in non-single session mode.
      end_local_step = self.num_batches - self.init_global_step
    else:
      end_local_step = self.num_batches - (self.init_global_step //
                                           self.num_workers)
    if not global_step_watcher:
      # In cross-replica sync mode, all workers must run the same number of
      # local steps, or else the workers running the extra step will block.
      done_fn = lambda: local_step >= end_local_step
    else:
      done_fn = global_step_watcher.done
    if self.params.debugger is not None:
      if self.params.debugger == 'cli':
        log_fn('The CLI TensorFlow debugger will be used.')
        sess = tf_debug.LocalCLIDebugWrapperSession(sess)
      else:
        log_fn('The TensorBoard debugger plugin will be used.')
        sess = tf_debug.TensorBoardDebugWrapperSession(sess,
                                                       self.params.debugger)
    mlperf.logger.log(key=mlperf.tags.TRAIN_LOOP)
    skip_final_eval = False
    accuracy_at_1 = None
    accuracy_at_5 = None
    last_eval_step = local_step
    loop_start_time = time.perf_counter()
    last_average_loss = None
    while not done_fn():
      if local_step == 0:
        log_fn('Done warm up')
        if graph_info.execution_barrier:
          log_fn('Waiting for other replicas to finish warm up')
          sess.run([graph_info.execution_barrier])

        # TODO(laigd): rename 'Img' to maybe 'Input'.
        header_str = ('Step\tImg/sec\t' +
                      self.params.loss_type_to_report.replace('/', ' '))
        if self.params.print_training_accuracy or self.params.forward_only:
          # TODO(laigd): use the actual accuracy op names of the model.
          header_str += '\ttop_1_accuracy\ttop_5_accuracy'
        log_fn(header_str)
        assert len(step_train_times) == self.num_warmup_batches
        # reset times to ignore warm up batch
        step_train_times = []
        loop_start_time = time.perf_counter()
      if (summary_writer and
          (local_step + 1) % self.params.save_summaries_steps == 0):
        fetch_summary = graph_info.summary_op
      else:
        fetch_summary = None
      collective_graph_key = 7 if (
          self.params.variable_update == 'collective_all_reduce') else 0
      (summary_str, last_average_loss) = benchmark_one_step(
          sess, graph_info.fetches, local_step,
          self.batch_size * (self.num_workers
                             if self.single_session else 1), step_train_times,
          self.trace_filename, self.params.partitioned_graph_file_prefix,
          profiler, image_producer, self.params, fetch_summary,
          benchmark_logger=self.benchmark_logger,
          collective_graph_key=collective_graph_key,
          should_output_files=(self.params.variable_update != 'horovod' or
                               is_chief))
      if summary_str is not None and is_chief:
        supervisor.summary_computed(sess, summary_str)
      local_step += 1
      if (self.params.save_model_steps and
          local_step % self.params.save_model_steps == 0 and
          local_step > 0 and
          is_chief):
        supervisor.saver.save(sess, supervisor.save_path,
                              supervisor.global_step)
      if (eval_graph_info and local_step > 0 and not done_fn() and
          self._should_eval_during_training(local_step)):
        python_global_step = sess.run(graph_info.global_step)
        num_steps_since_last_eval = local_step - last_eval_step
        # The INPUT_SIZE tag value might not match the
        # PREPROC_NUM_TRAIN_EXAMPLES tag value, because the number of examples
        # run, which is INPUT_SIZE, is rounded up to the nearest multiple of
        # self.batch_size.
        mlperf.logger.log(
            key=mlperf.tags.INPUT_SIZE,
            value=num_steps_since_last_eval * self.batch_size)
        log_fn('Running evaluation at global_step {}'.format(
            python_global_step))
        accuracy_at_1, accuracy_at_5 = self._eval_once(
            sess, summary_writer, eval_graph_info.fetches,
            eval_graph_info.summary_op, eval_image_producer,
            python_global_step)
        last_eval_step = local_step
        if (self.params.stop_at_top_1_accuracy and
            accuracy_at_1 >= self.params.stop_at_top_1_accuracy):
          log_fn('Stopping, as eval accuracy at least %s was reached' %
                 self.params.stop_at_top_1_accuracy)
          skip_final_eval = True
          break
        else:
          log_fn('Resuming training')
      if eval_graph_info and self.model.reached_target():
        log_fn('Stopping, as the model indicates its custom goal was reached')
        skip_final_eval = True
        break
    loop_end_time = time.perf_counter()
    # Waits for the global step to be done, regardless of done_fn.
    if global_step_watcher:
      while not global_step_watcher.done():
        time.sleep(.25)
    if not global_step_watcher:
      elapsed_time = loop_end_time - loop_start_time
      average_wall_time = elapsed_time / local_step if local_step > 0 else 0
      images_per_sec = (self.num_workers * local_step * self.batch_size /
                        elapsed_time)
      num_steps = local_step * self.num_workers
    else:
      # NOTE: Each worker independently increases the global step. So,
      # num_steps will be the sum of the local_steps from each worker.
      num_steps = global_step_watcher.num_steps()
      elapsed_time = global_step_watcher.elapsed_time()
      average_wall_time = (elapsed_time * self.num_workers / num_steps
                           if num_steps > 0 else 0)
      images_per_sec = num_steps * self.batch_size / elapsed_time

    # We skip printing images/sec if --eval_during_training_* is specified,
    # because we are both processing training and evaluation images, so a
    # singular "images/sec" value is meaningless.
    if self.mode != constants.BenchmarkMode.TRAIN_AND_EVAL:
      log_fn('-' * 64)
      # TODO(laigd): rename 'images' to maybe 'inputs'.
      log_fn('total images/sec: %.2f' % images_per_sec)
      log_fn('-' * 64)
    else:
      log_fn('Done with training')
    num_steps_since_last_eval = local_step - last_eval_step
    mlperf.logger.log(
        key=mlperf.tags.INPUT_SIZE,
        value=num_steps_since_last_eval * self.batch_size)
    python_global_step = sess.run(graph_info.global_step)
    if eval_graph_info and not skip_final_eval:
      log_fn('Running final evaluation at global_step {}'.format(
          python_global_step))
      accuracy_at_1, accuracy_at_5 = self._eval_once(
          sess, summary_writer, eval_graph_info.fetches,
          eval_graph_info.summary_op, eval_image_producer, python_global_step)
    num_epochs_ran = (python_global_step * self.batch_size /
                      self.dataset.num_examples_per_epoch('train'))
    mlperf.logger.log_train_epochs(num_epochs_ran)
    if image_producer is not None:
      image_producer.done()
    if eval_image_producer is not None:
      eval_image_producer.done()
    if is_chief:
      if self.benchmark_logger:
        self.benchmark_logger.log_metric(
            'average_examples_per_sec', images_per_sec, global_step=num_steps)

    # Save the model checkpoint.
    if self.params.train_dir is not None and is_chief:
      checkpoint_path = os.path.join(self.params.train_dir, 'model.ckpt')
      if not gfile.Exists(self.params.train_dir):
        gfile.MakeDirs(self.params.train_dir)
      supervisor.saver.save(sess, checkpoint_path, graph_info.global_step)
    if graph_info.execution_barrier:
      # Wait for other workers to reach the end, so this worker doesn't
      # go away underneath them.
      sess.run([graph_info.execution_barrier])
    stats = {
        'num_workers': self.num_workers,
        'num_steps': num_steps,
        'average_wall_time': average_wall_time,
        'images_per_sec': images_per_sec
    }
    if last_average_loss is not None:
      stats['last_average_loss'] = last_average_loss
    if accuracy_at_1 is not None:
      stats['top_1_accuracy'] = accuracy_at_1
    if accuracy_at_5 is not None:
      stats['top_5_accuracy'] = accuracy_at_5

    success = bool(self.model.reached_target() or
                   (accuracy_at_1 and self.params.stop_at_top_1_accuracy and
                    accuracy_at_1 >= self.params.stop_at_top_1_accuracy))
    mlperf.logger.log(key=mlperf.tags.RUN_STOP, value={'success': success})
    mlperf.logger.log(key=mlperf.tags.RUN_FINAL)
    return stats