in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]
def _benchmark_graph(self, graph_info, eval_graph_info):
"""Benchmark the training graph.
Args:
graph_info: the namedtuple returned by _build_graph() which
contains all necessary information to benchmark the graph, including
named tensors/ops list, fetches, etc.
eval_graph_info: Similar to graph_info but for the eval graph if
--eval_during_training_* is used. Otherwise, None.
Returns:
Dictionary containing training statistics (num_workers, num_steps,
average_wall_time, images_per_sec).
"""
log_fn('Initializing graph')
if self.params.variable_update == 'horovod':
import horovod.tensorflow as hvd # pylint: disable=g-import-not-at-top
# First worker will be 'chief' - it will write summaries and
# save checkpoints.
is_chief = hvd.rank() == 0
else:
is_chief = (not self.job_name or self.task_index == 0)
summary_writer = None
if (is_chief and self.params.summary_verbosity and self.params.train_dir and
self.params.save_summaries_steps > 0):
summary_writer = tf.summary.FileWriter(self.params.train_dir,
tf.get_default_graph())
# We want to start the benchmark timer right after a image_producer barrier
# and avoids undesired waiting times on barriers.
if ((self.num_warmup_batches + len(graph_info.enqueue_ops) - 1) %
self.batch_group_size) != 0:
self.num_warmup_batches = int(
math.ceil(
(self.num_warmup_batches + len(graph_info.enqueue_ops) - 1.0) /
(self.batch_group_size)) * self.batch_group_size -
len(graph_info.enqueue_ops) + 1)
log_fn('Round up warm up steps to %d to match batch_group_size' %
self.num_warmup_batches)
assert ((self.num_warmup_batches + len(graph_info.enqueue_ops) - 1) %
self.batch_group_size) == 0
# We run the summaries in the same thread as the training operations by
# passing in None for summary_op to avoid a summary_thread being started.
# Running summaries and training operations in parallel could run out of
# GPU memory.
if is_chief and not self.forward_only_and_freeze:
saver = tf.train.Saver(
self.variable_mgr.savable_variables(),
save_relative_paths=True,
max_to_keep=self.params.max_ckpts_to_keep)
else:
saver = None
ready_for_local_init_op = None
if self.job_name and not (self.single_session or
self.distributed_collective):
# In distributed mode, we don't want to run local_var_init_op_group until
# the global variables are initialized, because local_var_init_op_group
# may use global variables (such as in distributed replicated mode). We
# don't set this in non-distributed mode, because in non-distributed mode,
# local_var_init_op_group may itself initialize global variables (such as
# in replicated mode).
ready_for_local_init_op = tf.report_uninitialized_variables(
tf.global_variables())
if self.params.variable_update == 'horovod':
import horovod.tensorflow as hvd # pylint: disable=g-import-not-at-top
bcast_global_variables_op = hvd.broadcast_global_variables(0)
else:
bcast_global_variables_op = None
if self.params.variable_update == 'collective_all_reduce':
# It doesn't matter what this collective_graph_key value is,
# so long as it's > 0 and the same at every worker.
init_run_options = tf.RunOptions()
init_run_options.experimental.collective_graph_key = 6
else:
init_run_options = tf.RunOptions()
local_var_init_ops = [graph_info.local_var_init_op_group]
if eval_graph_info:
# `eval_graph_info.local_var_init_op_group` also includes some of the
# training initializer ops, since it's difficult to filter them out.
# Rerunning the training initializer ops is OK, but we add a control
# dependency since running two sets of training initializer ops at the
# same time can cause race conditions.
with tf.control_dependencies(local_var_init_ops):
local_var_init_ops.append(eval_graph_info.local_var_init_op_group)
sv = tf.train.Supervisor(
# For the purpose of Supervisor, all Horovod workers are 'chiefs',
# since we want session to be initialized symmetrically on all the
# workers.
is_chief=is_chief or (self.params.variable_update == 'horovod'
or self.distributed_collective),
# Log dir should be unset on non-chief workers to prevent Horovod
# workers from corrupting each other's checkpoints.
logdir=self.params.train_dir if is_chief else None,
ready_for_local_init_op=ready_for_local_init_op,
local_init_op=local_var_init_ops,
saver=saver,
global_step=graph_info.global_step,
summary_op=None,
save_model_secs=self.params.save_model_secs,
summary_writer=summary_writer,
local_init_run_options=init_run_options)
profiler = tf.profiler.Profiler() if self.params.tfprof_file else None
if self.graph_file is not None:
path, filename = os.path.split(self.graph_file)
as_text = filename.endswith('txt')
log_fn('Writing GraphDef as %s to %s' % ( # pyformat break
'text' if as_text else 'binary', self.graph_file))
tf.train.write_graph(tf.get_default_graph().as_graph_def(add_shapes=True),
path, filename, as_text)
start_standard_services = (
self.params.train_dir or
self.dataset.queue_runner_required())
target = self.cluster_manager.get_target() if self.cluster_manager else ''
with sv.managed_session(
master=target,
config=create_config_proto(self.params),
start_standard_services=start_standard_services) as sess:
# Anything that can potentially raise an OutOfRangeError with 'sess' MUST
# be under this try block. The managed_session() context manager silently
# ignores OutOfRangeError, so we must catch them and wrap them with
# a different exception type so that they can be propagated up to the
# caller.
try:
stats = self.benchmark_with_session(
sess, sv, graph_info, eval_graph_info, bcast_global_variables_op,
is_chief, summary_writer, profiler)
except tf.errors.OutOfRangeError:
raise RuntimeError(
'Received OutOfRangeError. Wrapping in Runtime error to avoid '
'Supervisor from suppressing the error. Original OutOfRangeError '
'with traceback:\n' + traceback.format_exc())
sv.stop()
if profiler:
generate_tfprof_profile(profiler, self.params.tfprof_file)
return stats