in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]
def __init__(self, params, dataset=None, model=None):
"""Initialize BenchmarkCNN.
Args:
params: Params tuple, typically created by make_params or
make_params_from_flags.
dataset: If not None, the dataset to use. Otherwise, params is used to
obtain the dataset.
model: If not None, the model to use. Otherwise, params is used to obtain
the model.
Raises:
ValueError: Unsupported params settings.
"""
mlperf.logger.log(key=mlperf.tags.RUN_START)
self.params = params
if params.eval:
self._doing_eval = True
else:
# Note self._doing_eval can later switch to True in self._do_eval() if
# self.params.eval_during_training_* is specified.
self._doing_eval = False
self.dataset = dataset or datasets.create_dataset(self.params.data_dir,
self.params.data_name)
self.model = model or model_config.get_model_config(
self.params.model, self.dataset, self.params)
self.trace_filename = self.params.trace_file
self.rewriter_config = self.params.rewriter_config
autotune_threshold = self.params.autotune_threshold if (
self.params.autotune_threshold) else 1
min_autotune_warmup = 5 * autotune_threshold * autotune_threshold
self.num_warmup_batches = self.params.num_warmup_batches if (
self.params.num_warmup_batches is not None) else max(
10, min_autotune_warmup)
self.graph_file = self.params.graph_file
self.resize_method = self.params.resize_method
self.sync_queue_counter = 0
self.num_gpus = self.params.num_gpus
if self.params.gpu_indices:
self.gpu_indices = [int(x) for x in self.params.gpu_indices.split(',')]
else:
self.gpu_indices = [x for x in range(self.num_gpus)]
if (self.params.device == 'cpu' and self.params.data_format == 'NCHW' and
not self.params.mkl):
raise ValueError('device=cpu requires that data_format=NHWC')
if ((self.params.num_epochs_per_decay or
self.params.learning_rate_decay_factor) and
not (self.params.init_learning_rate is not None and
self.params.num_epochs_per_decay
and self.params.learning_rate_decay_factor)):
raise ValueError('If one of num_epochs_per_decay or '
'learning_rate_decay_factor is set, both must be set'
'and learning_rate must be set')
if (self.params.minimum_learning_rate and
not (self.params.init_learning_rate is not None and
self.params.num_epochs_per_decay and
self.params.learning_rate_decay_factor)):
raise ValueError('minimum_learning_rate requires learning_rate,'
'num_epochs_per_decay, and '
'learning_rate_decay_factor to be set')
if (self.params.use_fp16 and self.params.fp16_vars and
'replicated' in self.params.variable_update and
self.params.all_reduce_spec and 'nccl' in self.params.all_reduce_spec):
raise ValueError('fp16 variables are not supported with NCCL')
if (self.params.use_fp16 and self.params.fp16_vars and
self.params.gradient_repacking):
raise ValueError('--fp16_vars cannot be used with --gradient_repacking')
if self.params.variable_update == 'horovod' and self.params.num_gpus > 1:
raise ValueError('Horovod benchmarks require num_gpus=1 on each worker')
if self.params.variable_update == 'horovod' and self.params.job_name:
raise ValueError('job_name should not be specified for Horovod.')
if self.params.use_fp16 and self.params.fp16_enable_auto_loss_scale:
if self.params.all_reduce_spec and 'nccl' in self.params.all_reduce_spec:
raise ValueError('Automatic loss scaling is not supported with NCCL.')
if self.params.variable_update not in ('parameter_server', 'replicated',
'independent'):
raise ValueError('Automatic loss scaling is not supported with '
'variable_update=%s.' % self.params.variable_update)
if self.params.staged_vars:
raise ValueError('Automatic loss scaling is not supported with'
'staged_vars.')
if (self.params.debugger is not None and self.params.debugger != 'cli' and
':' not in self.params.debugger):
raise ValueError('--debugger must be "cli" or in the form '
'host:port')
if self.params.hierarchical_copy and self.params.num_gpus <= 1:
raise ValueError('--hierarchical_copy requires --num_gpus to be greater '
'than 1')
if params.save_model_secs and params.save_model_steps:
raise ValueError('At most one of --save_model_secs and '
'--save_model_steps can be specified')
eval_during_training_flags = list(map(bool, [
params.eval_during_training_every_n_steps,
params.eval_during_training_every_n_epochs,
params.eval_during_training_at_specified_steps,
params.eval_during_training_at_specified_epochs,
]))
if eval_during_training_flags.count(True) > 1:
raise ValueError('At most one flag with --eval_during_training_* prefix '
'must be specified.')
eval_during_training_enabled = any(eval_during_training_flags)
if eval_during_training_enabled:
if params.eval:
raise ValueError('At most one of --eval and --eval_during_training_* '
'must be specified')
if params.forward_only:
raise ValueError('At most one of --forward_only and '
'--eval_during_training_* must be specified')
if params.job_name:
raise ValueError('--eval_during_training_* is not yet supported in '
'distributed mode.')
if params.staged_vars:
raise ValueError('--eval_during_training_* is not currently compatible '
'with --staged_vars')
if params.stop_at_top_1_accuracy and not eval_during_training_enabled:
raise ValueError('--stop_at_top_1_accuracy is only supported with '
'--eval_during_training_*')
if params.collect_eval_results_async and params.model != 'ssd300':
raise ValueError('--collect_eval_results_async only works with ssd300 '
'model currently.')
if self.params.forward_only and self.params.freeze_when_forward_only:
if self.params.train_dir is not None:
raise ValueError('In forward_only mode, when --freeze_when_forward_only'
' is True, --train_dir should not be specified')
if self.params.data_dir and not self.params.datasets_use_prefetch:
raise ValueError('In forward_only mode, when --freeze_when_forward_only'
' is True and --data_dir is set, '
'--datasets_use_prefetch should be set to True')
if self.params.job_name:
raise ValueError('In forward_only mode, when --freeze_when_forward_only'
' is True, --job_name should not be specified and '
'distributed running is not supported')
self.forward_only_and_freeze = True
else:
self.forward_only_and_freeze = False
if self.params.trt_mode:
raise ValueError('--trt_mode should not be specified if one of '
'--forward_only and --freeze_when_forward_only is set '
'to False')
self.mode = get_mode_from_params(self.params)
# Use the batch size from the command line if specified, otherwise use the
# model's default batch size. Scale the benchmark's batch size by the
# number of GPUs.
if self.params.batch_size > 0:
self.model.set_batch_size(self.params.batch_size)
self.batch_size = self.model.get_batch_size() * self.num_gpus
if self.mode in (constants.BenchmarkMode.TRAIN,
constants.BenchmarkMode.TRAIN_AND_EVAL):
self.train_batch_size = self.batch_size
else:
self.train_batch_size = None
if self.mode in (constants.BenchmarkMode.EVAL,
constants.BenchmarkMode.TRAIN_AND_EVAL):
if self.params.eval_batch_size > 0:
self.eval_batch_size = self.params.eval_batch_size * self.num_gpus
else:
self.eval_batch_size = self.batch_size
else:
self.eval_batch_size = None
self.batch_group_size = self.params.batch_group_size
self.enable_auto_loss_scale = (
self.params.use_fp16 and self.params.fp16_enable_auto_loss_scale)
self.loss_scale = None
self.loss_scale_normal_steps = None
self.job_name = self.params.job_name # "" for local training
# PS server is used for distributed jobs not using all-reduce.
use_ps_server = self.job_name and (self.params.variable_update !=
'distributed_all_reduce' and
self.params.variable_update !=
'collective_all_reduce')
# controller is used for distributed_all_reduce with > 1 worker.
use_controller = (
self.params.variable_update == 'distributed_all_reduce' and
self.job_name)
if use_controller and not params.controller_host:
raise ValueError('When variable_update==distributed_all_reduce '
'controller_host must also be specified.')
self.single_session = (
self.params.variable_update == 'distributed_all_reduce')
# collective_all_reduce doesn't need a controller or ps
self.distributed_collective = (
self.params.variable_update == 'collective_all_reduce' and
self.job_name)
self.local_parameter_device_flag = self.params.local_parameter_device
if self.job_name:
self.task_index = self.params.task_index
self.cluster_manager = platforms_util.get_cluster_manager(
params, create_config_proto(params))
assert isinstance(self.cluster_manager, cnn_util.BaseClusterManager)
worker_prefix = '/job:worker/replica:0/task:%s' % self.task_index
if use_ps_server:
self.param_server_device = tf.train.replica_device_setter(
worker_device=worker_prefix + '/cpu:0',
cluster=self.cluster_manager.get_cluster_spec())
# This device on which the queues for managing synchronization between
# servers should be stored.
self.sync_queue_devices = [
'/job:ps/replica:0/task:%s/cpu:0' % i
for i in range(self.cluster_manager.num_ps())
]
else:
self.sync_queue_devices = ['/job:worker/replica:0/task:0/cpu:0']
else:
self.task_index = 0
self.cluster_manager = None
worker_prefix = ''
self.param_server_device = '/%s:0' % self.params.local_parameter_device
self.sync_queue_devices = [self.param_server_device]
if self.cluster_manager:
self.num_workers = self.cluster_manager.num_workers()
elif self.params.variable_update == 'horovod':
import horovod.tensorflow as hvd # pylint: disable=g-import-not-at-top
self.num_workers = hvd.size()
else:
self.num_workers = 1
self.num_ps = self.cluster_manager.num_ps() if self.cluster_manager else 0
if self.num_workers > 1 and self.params.all_reduce_spec == 'nccl':
raise ValueError('--all_reduce_spec=nccl is invalid in a '
'multi-worker job')
# Device to use for ops that need to always run on the local worker's CPU.
self.cpu_device = '%s/cpu:0' % worker_prefix
# Device to use for ops that need to always run on the local worker's
# compute device, and never on a parameter server device.
self.raw_devices = [
'%s/%s:%i' % (worker_prefix, self.params.device, i)
for i in xrange(self.num_gpus)
]
subset = 'validation' if params.eval else 'train'
self.num_batches, self.num_epochs = get_num_batches_and_epochs(
params, self.batch_size * self.num_workers,
self.dataset.num_examples_per_epoch(subset))
if self.mode in (constants.BenchmarkMode.EVAL,
constants.BenchmarkMode.TRAIN_AND_EVAL):
# TODO(reedwm): Currently we do extra eval logic for num_eval_batches and
# the preprocessor. We should encapsulate this logic into a shared
# function or class.
if params.num_eval_batches is None and params.num_eval_epochs is None:
eval_params = self.params
else:
eval_params = self.params._replace(
num_batches=self.params.num_eval_batches,
num_epochs=self.params.num_eval_epochs)
self.num_eval_batches, self.num_eval_epochs = get_num_batches_and_epochs(
eval_params, self.eval_batch_size * self.num_workers,
self.dataset.num_examples_per_epoch('validation'))
else:
self.num_eval_batches, self.num_eval_epochs = None, None
num_train_examples_per_epoch = self.dataset.num_examples_per_epoch('train')
if self.params.eval_during_training_every_n_epochs:
n_epochs = self.params.eval_during_training_every_n_epochs
self.eval_during_training_at_specified_steps = {
(int(e * num_train_examples_per_epoch + self.batch_size - 1) //
self.batch_size)
for e in np.arange(n_epochs, self.num_epochs, n_epochs)}
if self.params.eval_during_training_at_specified_steps:
try:
self.eval_during_training_at_specified_steps = set(map(
int, self.params.eval_during_training_at_specified_steps))
except ValueError:
raise ValueError('Param eval_during_training_at_specified_steps value '
'of %s cannot be converted to a list of integers.' %
(self.params.eval_during_training_at_specified_steps))
if self.params.eval_during_training_at_specified_epochs:
try:
n_epochs = list(map(
float, self.params.eval_during_training_at_specified_epochs))
offset = n_epochs[0] - 1
if offset.is_integer():
offset = int(offset)
mlperf.logger.log(key=mlperf.tags.EVAL_EPOCH_OFFSET, value=offset)
self.eval_during_training_at_specified_steps = {
(int(e * num_train_examples_per_epoch + self.batch_size - 1) //
self.batch_size)
for e in n_epochs}
except ValueError:
raise ValueError('Param eval_during_training_at_specified_epochs value '
'of %s cannot be converted to a list of floats.' %
(self.params.eval_during_training_at_specified_epochs))
if params.eval_during_training_every_n_epochs:
offset = params.eval_during_training_every_n_epochs - 1
if offset.is_integer():
offset = int(offset)
mlperf.logger.log(key=mlperf.tags.EVAL_EPOCH_OFFSET, value=offset)
if (self.params.staged_vars and
self.params.variable_update != 'parameter_server'):
raise ValueError('staged_vars for now is only supported with '
'variable_update=parameter_server')
if self.params.variable_update == 'parameter_server':
if self.job_name:
if not self.params.staged_vars:
self.variable_mgr = variable_mgr.VariableMgrDistributedFetchFromPS(
self)
else:
self.variable_mgr = (
variable_mgr.VariableMgrDistributedFetchFromStagedPS(self))
else:
if not self.params.staged_vars:
self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromPS(self)
else:
self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromStagedPS(
self)
elif self.params.variable_update == 'replicated':
if self.job_name:
raise ValueError('Invalid variable_update in distributed mode: %s' %
self.params.variable_update)
self.variable_mgr = variable_mgr.VariableMgrLocalReplicated(
self, self.params.all_reduce_spec,
self.params.agg_small_grads_max_bytes,
self.params.agg_small_grads_max_group,
self.params.allreduce_merge_scope)
elif self.params.variable_update == 'distributed_all_reduce':
assert self.params.cross_replica_sync
self.variable_mgr = variable_mgr.VariableMgrDistributedAllReduce(
self, self.params.all_reduce_spec,
('worker' if self.num_workers > 1 else 'localhost'),
self.num_workers, self.params.agg_small_grads_max_bytes,
self.params.agg_small_grads_max_group,
self.params.allreduce_merge_scope)
elif self.params.variable_update == 'collective_all_reduce':
assert self.params.cross_replica_sync
self.variable_mgr = variable_mgr.VariableMgrCollectiveAllReduce(
self, self.params.all_reduce_spec,
self.num_workers, self.num_gpus, self.task_index,
self.params.allreduce_merge_scope)
elif self.params.variable_update == 'distributed_replicated':
assert self.params.cross_replica_sync
if not self.job_name:
raise ValueError('Invalid variable_update in local mode: %s' %
self.params.variable_update)
self.variable_mgr = variable_mgr.VariableMgrDistributedReplicated(self)
elif self.params.variable_update in ('independent', 'horovod'):
if self.job_name:
raise ValueError('Invalid variable_update in distributed mode: %s' %
self.params.variable_update)
self.variable_mgr = variable_mgr.VariableMgrIndependent(self)
else:
raise ValueError(
'Invalid variable_update: %s' % self.params.variable_update)
# Device to use for running on the local worker's compute device, but
# with variables assigned to parameter server devices.
self.devices = self.variable_mgr.get_devices()
if self.job_name:
if use_ps_server:
self.global_step_device = self.param_server_device
elif self.params.variable_update == 'collective_all_reduce':
self.global_step_device = self.cpu_device
else:
self.global_step_device = '/job:worker/replica:0/task:0/cpu:0'
else:
self.global_step_device = self.cpu_device
self.input_preprocessor = None
self.eval_input_preprocessor = None
if not self.dataset.use_synthetic_gpu_inputs():
if not self.params.eval:
self.input_preprocessor = self.get_input_preprocessor()
if self.mode in (constants.BenchmarkMode.EVAL,
constants.BenchmarkMode.TRAIN_AND_EVAL):
with self._do_eval():
self.eval_input_preprocessor = self.get_input_preprocessor()
self.datasets_use_prefetch = (
self.params.datasets_use_prefetch and
# TODO(rohanj): Figure out why --datasets_use_prefetch freezes on the
# CPU.
self.params.device.lower() != 'cpu' and
self.input_preprocessor and
self.input_preprocessor.supports_datasets())
self.init_global_step = 0
self._config_benchmark_logger()
if self.mode == constants.BenchmarkMode.TRAIN_AND_EVAL:
# Remove "eval" from params so it is not accidentally used. Since eval can
# still occur despite params.eval being False, params.eval should never
# be used. We cannot yet remove this unconditionally, because the SSD
# model still uses params.eval, and hence does not work properly with
# --eval_during_training_*.
# TODO(b/116627045): We should also remove fields that have an eval
# equivalent, like num_batches and num_eval_batches.
self.params = remove_param_fields(self.params, {'eval'})