def __init__()

in scripts/tf_cnn_benchmarks/benchmark_cnn.py [0:0]


  def __init__(self, params, dataset=None, model=None):
    """Initialize BenchmarkCNN.

    Args:
      params: Params tuple, typically created by make_params or
              make_params_from_flags.
      dataset: If not None, the dataset to use. Otherwise, params is used to
               obtain the dataset.
      model: If not None, the model to use. Otherwise, params is used to obtain
             the model.
    Raises:
      ValueError: Unsupported params settings.
    """
    mlperf.logger.log(key=mlperf.tags.RUN_START)
    self.params = params
    if params.eval:
      self._doing_eval = True
    else:
      # Note self._doing_eval can later switch to True in self._do_eval() if
      # self.params.eval_during_training_* is specified.
      self._doing_eval = False
    self.dataset = dataset or datasets.create_dataset(self.params.data_dir,
                                                      self.params.data_name)
    self.model = model or model_config.get_model_config(
        self.params.model, self.dataset, self.params)
    self.trace_filename = self.params.trace_file
    self.rewriter_config = self.params.rewriter_config
    autotune_threshold = self.params.autotune_threshold if (
        self.params.autotune_threshold) else 1
    min_autotune_warmup = 5 * autotune_threshold * autotune_threshold
    self.num_warmup_batches = self.params.num_warmup_batches if (
        self.params.num_warmup_batches is not None) else max(
            10, min_autotune_warmup)
    self.graph_file = self.params.graph_file
    self.resize_method = self.params.resize_method
    self.sync_queue_counter = 0
    self.num_gpus = self.params.num_gpus
    if self.params.gpu_indices:
      self.gpu_indices = [int(x) for x in self.params.gpu_indices.split(',')]
    else:
      self.gpu_indices = [x for x in range(self.num_gpus)]

    if (self.params.device == 'cpu' and self.params.data_format == 'NCHW' and
        not self.params.mkl):
      raise ValueError('device=cpu requires that data_format=NHWC')

    if ((self.params.num_epochs_per_decay or
         self.params.learning_rate_decay_factor) and
        not (self.params.init_learning_rate is not None and
             self.params.num_epochs_per_decay
             and self.params.learning_rate_decay_factor)):
      raise ValueError('If one of num_epochs_per_decay or '
                       'learning_rate_decay_factor is set, both must be set'
                       'and learning_rate must be set')
    if (self.params.minimum_learning_rate and
        not (self.params.init_learning_rate is not None and
             self.params.num_epochs_per_decay and
             self.params.learning_rate_decay_factor)):
      raise ValueError('minimum_learning_rate requires learning_rate,'
                       'num_epochs_per_decay, and '
                       'learning_rate_decay_factor to be set')

    if (self.params.use_fp16 and self.params.fp16_vars and
        'replicated' in self.params.variable_update and
        self.params.all_reduce_spec and 'nccl' in self.params.all_reduce_spec):
      raise ValueError('fp16 variables are not supported with NCCL')
    if (self.params.use_fp16 and self.params.fp16_vars and
        self.params.gradient_repacking):
      raise ValueError('--fp16_vars cannot be used with --gradient_repacking')

    if self.params.variable_update == 'horovod' and self.params.num_gpus > 1:
      raise ValueError('Horovod benchmarks require num_gpus=1 on each worker')

    if self.params.variable_update == 'horovod' and self.params.job_name:
      raise ValueError('job_name should not be specified for Horovod.')

    if self.params.use_fp16 and self.params.fp16_enable_auto_loss_scale:
      if self.params.all_reduce_spec and 'nccl' in self.params.all_reduce_spec:
        raise ValueError('Automatic loss scaling is not supported with NCCL.')
      if self.params.variable_update not in ('parameter_server', 'replicated',
                                             'independent'):
        raise ValueError('Automatic loss scaling is not supported with '
                         'variable_update=%s.' % self.params.variable_update)
      if self.params.staged_vars:
        raise ValueError('Automatic loss scaling is not supported with'
                         'staged_vars.')

    if (self.params.debugger is not None and self.params.debugger != 'cli' and
        ':' not in self.params.debugger):
      raise ValueError('--debugger must be "cli" or in the form '
                       'host:port')

    if self.params.hierarchical_copy and self.params.num_gpus <= 1:
      raise ValueError('--hierarchical_copy requires --num_gpus to be greater '
                       'than 1')

    if params.save_model_secs and params.save_model_steps:
      raise ValueError('At most one of --save_model_secs and '
                       '--save_model_steps can be specified')

    eval_during_training_flags = list(map(bool, [
        params.eval_during_training_every_n_steps,
        params.eval_during_training_every_n_epochs,
        params.eval_during_training_at_specified_steps,
        params.eval_during_training_at_specified_epochs,
    ]))

    if eval_during_training_flags.count(True) > 1:
      raise ValueError('At most one flag with --eval_during_training_* prefix '
                       'must be specified.')

    eval_during_training_enabled = any(eval_during_training_flags)

    if eval_during_training_enabled:
      if params.eval:
        raise ValueError('At most one of --eval and --eval_during_training_* '
                         'must be specified')
      if params.forward_only:
        raise ValueError('At most one of --forward_only and '
                         '--eval_during_training_* must be specified')
      if params.job_name:
        raise ValueError('--eval_during_training_* is not yet supported in '
                         'distributed mode.')
      if params.staged_vars:
        raise ValueError('--eval_during_training_* is not currently compatible '
                         'with --staged_vars')

    if params.stop_at_top_1_accuracy and not eval_during_training_enabled:
      raise ValueError('--stop_at_top_1_accuracy is only supported with '
                       '--eval_during_training_*')
    if params.collect_eval_results_async and params.model != 'ssd300':
      raise ValueError('--collect_eval_results_async only works with ssd300 '
                       'model currently.')
    if self.params.forward_only and self.params.freeze_when_forward_only:
      if self.params.train_dir is not None:
        raise ValueError('In forward_only mode, when --freeze_when_forward_only'
                         ' is True, --train_dir should not be specified')
      if self.params.data_dir and not self.params.datasets_use_prefetch:
        raise ValueError('In forward_only mode, when --freeze_when_forward_only'
                         ' is True and --data_dir is set, '
                         '--datasets_use_prefetch should be set to True')
      if self.params.job_name:
        raise ValueError('In forward_only mode, when --freeze_when_forward_only'
                         ' is True, --job_name should not be specified and '
                         'distributed running is not supported')
      self.forward_only_and_freeze = True
    else:
      self.forward_only_and_freeze = False
      if self.params.trt_mode:
        raise ValueError('--trt_mode should not be specified if one of '
                         '--forward_only and --freeze_when_forward_only is set '
                         'to False')

    self.mode = get_mode_from_params(self.params)

    # Use the batch size from the command line if specified, otherwise use the
    # model's default batch size.  Scale the benchmark's batch size by the
    # number of GPUs.
    if self.params.batch_size > 0:
      self.model.set_batch_size(self.params.batch_size)
    self.batch_size = self.model.get_batch_size() * self.num_gpus
    if self.mode in (constants.BenchmarkMode.TRAIN,
                     constants.BenchmarkMode.TRAIN_AND_EVAL):
      self.train_batch_size = self.batch_size
    else:
      self.train_batch_size = None
    if self.mode in (constants.BenchmarkMode.EVAL,
                     constants.BenchmarkMode.TRAIN_AND_EVAL):
      if self.params.eval_batch_size > 0:
        self.eval_batch_size = self.params.eval_batch_size * self.num_gpus
      else:
        self.eval_batch_size = self.batch_size
    else:
      self.eval_batch_size = None
    self.batch_group_size = self.params.batch_group_size
    self.enable_auto_loss_scale = (
        self.params.use_fp16 and self.params.fp16_enable_auto_loss_scale)
    self.loss_scale = None
    self.loss_scale_normal_steps = None

    self.job_name = self.params.job_name  # "" for local training

    # PS server is used for distributed jobs not using all-reduce.
    use_ps_server = self.job_name and (self.params.variable_update !=
                                       'distributed_all_reduce' and
                                       self.params.variable_update !=
                                       'collective_all_reduce')
    # controller is used for distributed_all_reduce with > 1 worker.
    use_controller = (
        self.params.variable_update == 'distributed_all_reduce' and
        self.job_name)
    if use_controller and not params.controller_host:
      raise ValueError('When variable_update==distributed_all_reduce '
                       'controller_host must also be specified.')
    self.single_session = (
        self.params.variable_update == 'distributed_all_reduce')
    # collective_all_reduce doesn't need a controller or ps
    self.distributed_collective = (
        self.params.variable_update == 'collective_all_reduce' and
        self.job_name)

    self.local_parameter_device_flag = self.params.local_parameter_device
    if self.job_name:
      self.task_index = self.params.task_index
      self.cluster_manager = platforms_util.get_cluster_manager(
          params, create_config_proto(params))
      assert isinstance(self.cluster_manager, cnn_util.BaseClusterManager)

      worker_prefix = '/job:worker/replica:0/task:%s' % self.task_index
      if use_ps_server:
        self.param_server_device = tf.train.replica_device_setter(
            worker_device=worker_prefix + '/cpu:0',
            cluster=self.cluster_manager.get_cluster_spec())
        # This device on which the queues for managing synchronization between
        # servers should be stored.
        self.sync_queue_devices = [
            '/job:ps/replica:0/task:%s/cpu:0' % i
            for i in range(self.cluster_manager.num_ps())
        ]
      else:
        self.sync_queue_devices = ['/job:worker/replica:0/task:0/cpu:0']
    else:
      self.task_index = 0
      self.cluster_manager = None
      worker_prefix = ''
      self.param_server_device = '/%s:0' % self.params.local_parameter_device
      self.sync_queue_devices = [self.param_server_device]

    if self.cluster_manager:
      self.num_workers = self.cluster_manager.num_workers()
    elif self.params.variable_update == 'horovod':
      import horovod.tensorflow as hvd  # pylint: disable=g-import-not-at-top
      self.num_workers = hvd.size()
    else:
      self.num_workers = 1
    self.num_ps = self.cluster_manager.num_ps() if self.cluster_manager else 0

    if self.num_workers > 1 and self.params.all_reduce_spec == 'nccl':
      raise ValueError('--all_reduce_spec=nccl is invalid in a '
                       'multi-worker job')

    # Device to use for ops that need to always run on the local worker's CPU.
    self.cpu_device = '%s/cpu:0' % worker_prefix

    # Device to use for ops that need to always run on the local worker's
    # compute device, and never on a parameter server device.
    self.raw_devices = [
        '%s/%s:%i' % (worker_prefix, self.params.device, i)
        for i in xrange(self.num_gpus)
    ]

    subset = 'validation' if params.eval else 'train'
    self.num_batches, self.num_epochs = get_num_batches_and_epochs(
        params, self.batch_size * self.num_workers,
        self.dataset.num_examples_per_epoch(subset))
    if self.mode in (constants.BenchmarkMode.EVAL,
                     constants.BenchmarkMode.TRAIN_AND_EVAL):
      # TODO(reedwm): Currently we do extra eval logic for num_eval_batches and
      # the preprocessor. We should encapsulate this logic into a shared
      # function or class.
      if params.num_eval_batches is None and params.num_eval_epochs is None:
        eval_params = self.params
      else:
        eval_params = self.params._replace(
            num_batches=self.params.num_eval_batches,
            num_epochs=self.params.num_eval_epochs)
      self.num_eval_batches, self.num_eval_epochs = get_num_batches_and_epochs(
          eval_params, self.eval_batch_size * self.num_workers,
          self.dataset.num_examples_per_epoch('validation'))
    else:
      self.num_eval_batches, self.num_eval_epochs = None, None

    num_train_examples_per_epoch = self.dataset.num_examples_per_epoch('train')
    if self.params.eval_during_training_every_n_epochs:
      n_epochs = self.params.eval_during_training_every_n_epochs
      self.eval_during_training_at_specified_steps = {
          (int(e * num_train_examples_per_epoch + self.batch_size - 1) //
           self.batch_size)
          for e in np.arange(n_epochs, self.num_epochs, n_epochs)}

    if self.params.eval_during_training_at_specified_steps:
      try:
        self.eval_during_training_at_specified_steps = set(map(
            int, self.params.eval_during_training_at_specified_steps))
      except ValueError:
        raise ValueError('Param eval_during_training_at_specified_steps value '
                         'of %s cannot be converted to a list of integers.' %
                         (self.params.eval_during_training_at_specified_steps))

    if self.params.eval_during_training_at_specified_epochs:
      try:
        n_epochs = list(map(
            float, self.params.eval_during_training_at_specified_epochs))
        offset = n_epochs[0] - 1
        if offset.is_integer():
          offset = int(offset)
        mlperf.logger.log(key=mlperf.tags.EVAL_EPOCH_OFFSET, value=offset)
        self.eval_during_training_at_specified_steps = {
            (int(e * num_train_examples_per_epoch + self.batch_size - 1) //
             self.batch_size)
            for e in n_epochs}
      except ValueError:
        raise ValueError('Param eval_during_training_at_specified_epochs value '
                         'of %s cannot be converted to a list of floats.' %
                         (self.params.eval_during_training_at_specified_epochs))

    if params.eval_during_training_every_n_epochs:
      offset = params.eval_during_training_every_n_epochs - 1
      if offset.is_integer():
        offset = int(offset)
      mlperf.logger.log(key=mlperf.tags.EVAL_EPOCH_OFFSET, value=offset)

    if (self.params.staged_vars and
        self.params.variable_update != 'parameter_server'):
      raise ValueError('staged_vars for now is only supported with '
                       'variable_update=parameter_server')

    if self.params.variable_update == 'parameter_server':
      if self.job_name:
        if not self.params.staged_vars:
          self.variable_mgr = variable_mgr.VariableMgrDistributedFetchFromPS(
              self)
        else:
          self.variable_mgr = (
              variable_mgr.VariableMgrDistributedFetchFromStagedPS(self))
      else:
        if not self.params.staged_vars:
          self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromPS(self)
        else:
          self.variable_mgr = variable_mgr.VariableMgrLocalFetchFromStagedPS(
              self)
    elif self.params.variable_update == 'replicated':
      if self.job_name:
        raise ValueError('Invalid variable_update in distributed mode: %s' %
                         self.params.variable_update)
      self.variable_mgr = variable_mgr.VariableMgrLocalReplicated(
          self, self.params.all_reduce_spec,
          self.params.agg_small_grads_max_bytes,
          self.params.agg_small_grads_max_group,
          self.params.allreduce_merge_scope)
    elif self.params.variable_update == 'distributed_all_reduce':
      assert self.params.cross_replica_sync
      self.variable_mgr = variable_mgr.VariableMgrDistributedAllReduce(
          self, self.params.all_reduce_spec,
          ('worker' if self.num_workers > 1 else 'localhost'),
          self.num_workers, self.params.agg_small_grads_max_bytes,
          self.params.agg_small_grads_max_group,
          self.params.allreduce_merge_scope)
    elif self.params.variable_update == 'collective_all_reduce':
      assert self.params.cross_replica_sync
      self.variable_mgr = variable_mgr.VariableMgrCollectiveAllReduce(
          self, self.params.all_reduce_spec,
          self.num_workers, self.num_gpus, self.task_index,
          self.params.allreduce_merge_scope)
    elif self.params.variable_update == 'distributed_replicated':
      assert self.params.cross_replica_sync
      if not self.job_name:
        raise ValueError('Invalid variable_update in local mode: %s' %
                         self.params.variable_update)
      self.variable_mgr = variable_mgr.VariableMgrDistributedReplicated(self)
    elif self.params.variable_update in ('independent', 'horovod'):
      if self.job_name:
        raise ValueError('Invalid variable_update in distributed mode: %s' %
                         self.params.variable_update)
      self.variable_mgr = variable_mgr.VariableMgrIndependent(self)
    else:
      raise ValueError(
          'Invalid variable_update: %s' % self.params.variable_update)

    # Device to use for running on the local worker's compute device, but
    # with variables assigned to parameter server devices.
    self.devices = self.variable_mgr.get_devices()
    if self.job_name:
      if use_ps_server:
        self.global_step_device = self.param_server_device
      elif self.params.variable_update == 'collective_all_reduce':
        self.global_step_device = self.cpu_device
      else:
        self.global_step_device = '/job:worker/replica:0/task:0/cpu:0'
    else:
      self.global_step_device = self.cpu_device

    self.input_preprocessor = None
    self.eval_input_preprocessor = None
    if not self.dataset.use_synthetic_gpu_inputs():
      if not self.params.eval:
        self.input_preprocessor = self.get_input_preprocessor()
      if self.mode in (constants.BenchmarkMode.EVAL,
                       constants.BenchmarkMode.TRAIN_AND_EVAL):
        with self._do_eval():
          self.eval_input_preprocessor = self.get_input_preprocessor()
    self.datasets_use_prefetch = (
        self.params.datasets_use_prefetch and
        # TODO(rohanj): Figure out why --datasets_use_prefetch freezes on the
        # CPU.
        self.params.device.lower() != 'cpu' and
        self.input_preprocessor and
        self.input_preprocessor.supports_datasets())
    self.init_global_step = 0

    self._config_benchmark_logger()

    if self.mode == constants.BenchmarkMode.TRAIN_AND_EVAL:
      # Remove "eval" from params so it is not accidentally used. Since eval can
      # still occur despite params.eval being False, params.eval should never
      # be used. We cannot yet remove this unconditionally, because the SSD
      # model still uses params.eval, and hence does not work properly with
      # --eval_during_training_*.
      # TODO(b/116627045): We should also remove fields that have an eval
      # equivalent, like num_batches and num_eval_batches.
      self.params = remove_param_fields(self.params, {'eval'})