in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]
def benchmark_with_session(self, sess, supervisor, graph_info,
eval_graph_info, bcast_global_variables_op,
is_chief, summary_writer, profiler):
"""Benchmarks the graph with the given session.
Args:
sess: The session to benchmark the graph with
supervisor: The Supervisor that created the session.
graph_info: the namedtuple returned by _build_graph() which
contains all necessary information to benchmark the graph, including
named tensors/ops list, fetches, etc.
eval_graph_info: Similar to graph_info but for the eval graph if
--eval_during_training_every_n_steps is used. Otherwise, None.
bcast_global_variables_op: If Horovod is used, the op to broadcast the
global variables to all the processes. None if Horovod is not used.
is_chief: True if this is the chief process.
summary_writer: The SummaryWriter used to write summaries, or None if
summaries are not used.
profiler: The tf.profiler.Profiler, or None if tfprof is not used.
Returns:
Dictionary containing training statistics (num_workers, num_steps,
average_wall_time, images_per_sec).
"""
if self.params.backbone_model_path is not None:
self.model.load_backbone_model(sess, self.params.backbone_model_path)
if bcast_global_variables_op:
sess.run(bcast_global_variables_op)
image_producer = None
if graph_info.input_producer_op is not None:
image_producer = cnn_util.ImageProducer(
sess, graph_info.input_producer_op, self.batch_group_size,
self.params.use_python32_barrier)
image_producer.start()
if graph_info.enqueue_ops:
for i in xrange(len(graph_info.enqueue_ops)):
sess.run(graph_info.enqueue_ops[:(i + 1)])
if image_producer is not None:
image_producer.notify_image_consumption()
self.init_global_step, = sess.run([graph_info.global_step])
if self.job_name and not self.params.cross_replica_sync:
# TODO(zhengxq): Do we need to use a global step watcher at all?
global_step_watcher = GlobalStepWatcher(
sess, graph_info.global_step,
self.num_workers * self.num_warmup_batches +
self.init_global_step,
self.num_workers * (self.num_warmup_batches + self.num_batches) - 1)
global_step_watcher.start()
else:
global_step_watcher = None
eval_image_producer = None
if eval_graph_info:
# We pass local_var_init_op_group=None because the Supervisor already
# initialized local variables above. We need to have the Supervisor
# initialize the local variables, because otherwise it throws an error
# complaining that not all variables were initialized.
eval_image_producer = self._initialize_eval_graph(
eval_graph_info.enqueue_ops, eval_graph_info.input_producer_op,
local_var_init_op_group=None, sess=sess)
step_train_times = []
log_fn('Running warm up')
local_step = -1 * self.num_warmup_batches
if self.single_session:
# In single session mode, each step, the global_step is incremented by
# 1. In non-single session mode, each step, the global_step is
# incremented once per worker. This means we need to divide
# init_global_step by num_workers only in non-single session mode.
end_local_step = self.num_batches - self.init_global_step
else:
end_local_step = self.num_batches - (self.init_global_step //
self.num_workers)
if not global_step_watcher:
# In cross-replica sync mode, all workers must run the same number of
# local steps, or else the workers running the extra step will block.
done_fn = lambda: local_step >= end_local_step
else:
done_fn = global_step_watcher.done
if self.params.debugger is not None:
if self.params.debugger == 'cli':
log_fn('The CLI TensorFlow debugger will be used.')
sess = tf_debug.LocalCLIDebugWrapperSession(sess)
else:
log_fn('The TensorBoard debugger plugin will be used.')
sess = tf_debug.TensorBoardDebugWrapperSession(sess,
self.params.debugger)
mlperf.logger.log(key=mlperf.tags.TRAIN_LOOP)
skip_final_eval = False
accuracy_at_1 = None
accuracy_at_5 = None
last_eval_step = local_step
loop_start_time = time.perf_counter()
last_average_loss = None
while not done_fn():
if local_step == 0:
log_fn('Done warm up')
if graph_info.execution_barrier:
log_fn('Waiting for other replicas to finish warm up')
sess.run([graph_info.execution_barrier])
# TODO(laigd): rename 'Img' to maybe 'Input'.
header_str = ('Step\tImg/sec\t' +
self.params.loss_type_to_report.replace('/', ' '))
if self.params.print_training_accuracy or self.params.forward_only:
# TODO(laigd): use the actual accuracy op names of the model.
header_str += '\ttop_1_accuracy\ttop_5_accuracy'
log_fn(header_str)
assert len(step_train_times) == self.num_warmup_batches
# reset times to ignore warm up batch
step_train_times = []
loop_start_time = time.perf_counter()
if (summary_writer and
(local_step + 1) % self.params.save_summaries_steps == 0):
fetch_summary = graph_info.summary_op
else:
fetch_summary = None
collective_graph_key = 7 if (
self.params.variable_update == 'collective_all_reduce') else 0
(summary_str, last_average_loss) = benchmark_one_step(
sess, graph_info.fetches, local_step,
self.batch_size * (self.num_workers
if self.single_session else 1), step_train_times,
self.trace_filename, self.params.partitioned_graph_file_prefix,
profiler, image_producer, self.params, fetch_summary,
benchmark_logger=self.benchmark_logger,
collective_graph_key=collective_graph_key,
should_output_files=(self.params.variable_update != 'horovod' or
is_chief))
if summary_str is not None and is_chief:
supervisor.summary_computed(sess, summary_str)
local_step += 1
if (self.params.save_model_steps and
local_step % self.params.save_model_steps == 0 and
local_step > 0 and
is_chief):
supervisor.saver.save(sess, supervisor.save_path,
supervisor.global_step)
if (eval_graph_info and local_step > 0 and not done_fn() and
self._should_eval_during_training(local_step)):
python_global_step = sess.run(graph_info.global_step)
num_steps_since_last_eval = local_step - last_eval_step
# The INPUT_SIZE tag value might not match the
# PREPROC_NUM_TRAIN_EXAMPLES tag value, because the number of examples
# run, which is INPUT_SIZE, is rounded up to the nearest multiple of
# self.batch_size.
mlperf.logger.log(
key=mlperf.tags.INPUT_SIZE,
value=num_steps_since_last_eval * self.batch_size)
log_fn('Running evaluation at global_step {}'.format(
python_global_step))
accuracy_at_1, accuracy_at_5 = self._eval_once(
sess, summary_writer, eval_graph_info.fetches,
eval_graph_info.summary_op, eval_image_producer,
python_global_step)
last_eval_step = local_step
if (self.params.stop_at_top_1_accuracy and
accuracy_at_1 >= self.params.stop_at_top_1_accuracy):
log_fn('Stopping, as eval accuracy at least %s was reached' %
self.params.stop_at_top_1_accuracy)
skip_final_eval = True
break
else:
log_fn('Resuming training')
if eval_graph_info and self.model.reached_target():
log_fn('Stopping, as the model indicates its custom goal was reached')
skip_final_eval = True
break
loop_end_time = time.perf_counter()
# Waits for the global step to be done, regardless of done_fn.
if global_step_watcher:
while not global_step_watcher.done():
time.sleep(.25)
if not global_step_watcher:
elapsed_time = loop_end_time - loop_start_time
average_wall_time = elapsed_time / local_step if local_step > 0 else 0
images_per_sec = (self.num_workers * local_step * self.batch_size /
elapsed_time)
num_steps = local_step * self.num_workers
else:
# NOTE: Each worker independently increases the global step. So,
# num_steps will be the sum of the local_steps from each worker.
num_steps = global_step_watcher.num_steps()
elapsed_time = global_step_watcher.elapsed_time()
average_wall_time = (elapsed_time * self.num_workers / num_steps
if num_steps > 0 else 0)
images_per_sec = num_steps * self.batch_size / elapsed_time
# We skip printing images/sec if --eval_during_training_* is specified,
# because we are both processing training and evaluation images, so a
# singular "images/sec" value is meaningless.
if self.mode != constants.BenchmarkMode.TRAIN_AND_EVAL:
log_fn('-' * 64)
# TODO(laigd): rename 'images' to maybe 'inputs'.
log_fn('total images/sec: %.2f' % images_per_sec)
log_fn('-' * 64)
else:
log_fn('Done with training')
num_steps_since_last_eval = local_step - last_eval_step
mlperf.logger.log(
key=mlperf.tags.INPUT_SIZE,
value=num_steps_since_last_eval * self.batch_size)
python_global_step = sess.run(graph_info.global_step)
if eval_graph_info and not skip_final_eval:
log_fn('Running final evaluation at global_step {}'.format(
python_global_step))
accuracy_at_1, accuracy_at_5 = self._eval_once(
sess, summary_writer, eval_graph_info.fetches,
eval_graph_info.summary_op, eval_image_producer, python_global_step)
num_epochs_ran = (python_global_step * self.batch_size /
self.dataset.num_examples_per_epoch('train'))
mlperf.logger.log_train_epochs(num_epochs_ran)
if image_producer is not None:
image_producer.done()
if eval_image_producer is not None:
eval_image_producer.done()
if is_chief:
if self.benchmark_logger:
self.benchmark_logger.log_metric(
'average_examples_per_sec', images_per_sec, global_step=num_steps)
# Save the model checkpoint.
if self.params.train_dir is not None and is_chief:
checkpoint_path = os.path.join(self.params.train_dir, 'model.ckpt')
if not gfile.Exists(self.params.train_dir):
gfile.MakeDirs(self.params.train_dir)
supervisor.saver.save(sess, checkpoint_path, graph_info.global_step)
if graph_info.execution_barrier:
# Wait for other workers to reach the end, so this worker doesn't
# go away underneath them.
sess.run([graph_info.execution_barrier])
stats = {
'num_workers': self.num_workers,
'num_steps': num_steps,
'average_wall_time': average_wall_time,
'images_per_sec': images_per_sec
}
if last_average_loss is not None:
stats['last_average_loss'] = last_average_loss
if accuracy_at_1 is not None:
stats['top_1_accuracy'] = accuracy_at_1
if accuracy_at_5 is not None:
stats['top_5_accuracy'] = accuracy_at_5
success = bool(self.model.reached_target() or
(accuracy_at_1 and self.params.stop_at_top_1_accuracy and
accuracy_at_1 >= self.params.stop_at_top_1_accuracy))
mlperf.logger.log(key=mlperf.tags.RUN_STOP, value={'success': success})
mlperf.logger.log(key=mlperf.tags.RUN_FINAL)
return stats