def __call__()

in tensorflow/sagemakercv/data/coco/dataloader.py [0:0]


    def __call__(self, input_context=None):

        #batch_size = params['batch_size'] if 'batch_size' in params else 1
        batch_size = self._batch_size
        #do_dist_eval = params['dist_eval']
        do_dist_eval = self._dist_eval
        
        try:
            #seed = params['seed'] if not MPI_is_distributed() else params['seed'] * MPI_rank()
            seed = self._seed if not MPI_is_distributed() else self._seed * MPI_rank()
        except (KeyError, TypeError):
            seed = None

        if MPI_is_distributed():
            n_gpus = MPI_size()

        elif input_context is not None:
            n_gpus = input_context.num_input_pipelines

        else:
            n_gpus = 1
            
        ##################################################
        #This style of dataset sharding currently fails
        #With more than 32 nodes on evaluation. 
        #When MPI_size>32 and running eval, use this 
        #simpler pipeline.
        #################################################
        
#        if do_dist_eval and n_gpus>32 and \
#            (self._mode == tf.estimator.ModeKeys.PREDICT or \
#             self._mode == tf.estimator.ModeKeys.EVAL):
#            files = glob.glob(self._file_pattern)
#            dataset = tf.data.TFRecordDataset(files)
#            _shard_idx, _num_shards = MPI_rank_and_size()
#            dataset = dataset.shard(_num_shards, _shard_idx)
#            parser = lambda x: dataset_parser(x, self._mode, params, self._use_instance_mask, seed=seed)
#            dataset = dataset.map(parser , num_parallel_calls=tf.data.experimental.AUTOTUNE)
#            dataset = dataset.batch(batch_size=batch_size,drop_remainder=True)
#            dataset = dataset.repeat()
#            dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
#            return dataset

        ##################################################

        dataset = tf.data.Dataset.list_files(
            self._file_pattern,
            shuffle=False
        )

        if self._mode == tf.estimator.ModeKeys.TRAIN:

            if input_context is not None:
                logging.info("Using Dataset Sharding with TF Distributed")
                _num_shards = input_context.num_input_pipelines
                _shard_idx = input_context.input_pipeline_id

            elif MPI_is_distributed():
                logging.info("Using Dataset Sharding")
                _shard_idx, _num_shards = MPI_rank_and_size()

            try:
                dataset = dataset.shard(
                    num_shards=_num_shards,
                    index=_shard_idx
                )
                dataset = dataset.shuffle(math.ceil(512 / _num_shards))

            except NameError:  # Not a distributed training setup
                pass
        elif do_dist_eval and (self._mode == tf.estimator.ModeKeys.PREDICT or self._mode == tf.estimator.ModeKeys.EVAL):
            # 512 validation tf records - distribute on upto 512 workers
            if MPI_is_distributed():
                logging.info("Using Evaluation Dataset Sharding")
                _shard_idx, _num_shards = MPI_rank_and_size()
                max_shards = min(_num_shards, 512)
                try:
                    dataset = dataset.shard(
                        num_shards=max_shards,
                        index=_shard_idx % max_shards
                    )
                except NameError:  # Not a distributed training setup
                    pass
        

        def _prefetch_dataset(filename):
            return tf.data.TFRecordDataset(filename).prefetch(1)

        dataset = dataset.interleave(
            map_func=_prefetch_dataset,
            cycle_length=64,
            block_length=8,
            num_parallel_calls=tf.data.experimental.AUTOTUNE,
        )

        if self._num_examples is not None and self._num_examples > 0:
            logging.info("[*] Limiting the amount of sample to: %d" % self._num_examples)
            dataset = dataset.take(self._num_examples)

        dataset = dataset.cache()
        if self._mode == tf.estimator.ModeKeys.TRAIN:
            dataset = dataset.shuffle(
                buffer_size=4096,
                reshuffle_each_iteration=True,
                seed=seed
            )

            dataset = dataset.repeat()

        # Parse the fetched records to input tensors for model function.
        dataset = dataset.map(
            map_func=self._create_dataset_parser_fn(),
            num_parallel_calls=tf.data.experimental.AUTOTUNE,
        )

        dataset = dataset.batch(
            batch_size=batch_size,
            drop_remainder=False
        )

        if self._use_fake_data:
            # Turn this dataset into a semi-fake dataset which always loop at the
            # first batch. This reduces variance in performance and is useful in
            # testing.
            logging.info("Using Fake Dataset Loop...")
            dataset = dataset.take(1).cache().repeat()

            if self._mode != tf.estimator.ModeKeys.TRAIN:
                dataset = dataset.take(int(5000 / batch_size))

        dataset = dataset.prefetch(
            buffer_size=tf.data.experimental.AUTOTUNE,
        )

        '''if self._mode == tf.estimator.ModeKeys.PREDICT or n_gpus > 1:
            if not tf.distribute.has_strategy():
                dataset = dataset.apply(
                    tf.data.experimental.prefetch_to_device(
                        '/gpu:0',  # With Horovod the local GPU is always 0
                        buffer_size=1,
                    )
                )'''
        if not self._disable_options:
            data_options = tf.data.Options()

            data_options.experimental_deterministic = seed is not None
            if LooseVersion(tf.__version__) <= LooseVersion("2.0.0"):
                data_options.experimental_distribute.auto_shard = False
            else:
                data_options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
            # data_options.experimental_distribute.auto_shard = False
            data_options.experimental_slack = self._data_slack

            data_options.experimental_threading.max_intra_op_parallelism = 1
            # data_options.experimental_threading.private_threadpool_size = int(multiprocessing.cpu_count() / n_gpus) * 2

            # ================= experimental_optimization ================= #

            data_options.experimental_optimization.apply_default_optimizations = False

            # data_options.experimental_optimization.autotune = True
            data_options.experimental_optimization.filter_fusion = True
            data_options.experimental_optimization.map_and_batch_fusion = True
            data_options.experimental_optimization.map_and_filter_fusion = True
            data_options.experimental_optimization.map_fusion = True
            data_options.experimental_optimization.map_parallelization = True
            
            if int(tf.__version__.split('.')[1])<6:
                map_vectorization_options = tf.data.experimental.MapVectorizationOptions()
                map_vectorization_options.enabled = True
                map_vectorization_options.use_choose_fastest = True

                data_options.experimental_optimization.map_vectorization = map_vectorization_options

            data_options.experimental_optimization.noop_elimination = True
            data_options.experimental_optimization.parallel_batch = True
            data_options.experimental_optimization.shuffle_and_repeat_fusion = True

            # ========== Stats on TF Data =============
            # aggregator = tf.data.experimental.StatsAggregator()
            # data_options.experimental_stats.aggregator = aggregator
            # data_options.experimental_stats.latency_all_edges = True

            dataset = dataset.with_options(data_options)

        return dataset