def reader_throughput()

in petastorm/benchmark/throughput.py [0:0]


def reader_throughput(dataset_url, field_regex=None, warmup_cycles_count=300, measure_cycles_count=1000,
                      pool_type=WorkerPoolType.THREAD, loaders_count=3, profile_threads=False,
                      read_method=ReadMethod.PYTHON, shuffling_queue_size=500, min_after_dequeue=400,
                      reader_extra_args=None, pyarrow_serialize=False, spawn_new_process=True):
    """Constructs a Reader instance and uses it to performs throughput measurements.

    The function will spawn a new process if ``spawn_separate_process`` is set. This is needed to make memory footprint
    measurements accurate.

    :param dataset_url: A url of the dataset to be used for measurements.
    :param field_regex:  A list of regular expressions. Only fields that match one of the regex patterns will be used
      during the benchmark.
    :param warmup_cycles_count: Number of warmup cycles. During warmup cycles no measurements are being recorded.
    :param measure_cycles_count: Number of measurements cycles. Only time elapsed during measurements cycles are used
      in throughput calculations.
    :param pool_type: :class:`WorkerPoolType` enum value.
    :param loaders_count: Number of threads (same thread is used for IO and decoding).
    :param profile_threads:  Enables profiling threads. Will print result when thread pool is shut down.
    :param read_method:  An enum :class:`ReadMethod` that defines whether a :class:`petastorm.reader.Reader` will be
      used.
    :param shuffling_queue_size: Maximum number of elements in the shuffling queue.
    :param min_after_dequeue: Minimum number of elements in a shuffling queue before entries can be read from it.
    :param reader_extra_args: Extra arguments that would be passed to Reader constructor.
    :param pyarrow_serialize: When True, pyarrow.serialize library will be used for serializing decoded payloads.
    :param spawn_new_process: This function will respawn itself in a new process if the argument is True. Spawning
      a new process is needed to get an accurate memory footprint.

    :return: An instance of ``BenchmarkResult`` namedtuple with the results of the benchmark. The namedtuple has
      the following fields: `time_mean`, `samples_per_second`, `memory_info` and `cpu`
    """
    if not reader_extra_args:
        reader_extra_args = dict()

    if spawn_new_process:
        args = copy.deepcopy(locals())
        args['spawn_new_process'] = False
        executor = ProcessPoolExecutor(1)
        future = executor.submit(reader_throughput, **args)
        return future.result()

    logger.info('Arguments: %s', locals())

    if 'schema_fields' not in reader_extra_args:
        unischema_fields = match_unischema_fields(get_schema_from_dataset_url(dataset_url), field_regex)
        reader_extra_args['schema_fields'] = unischema_fields

    logger.info('Fields used in the benchmark: %s', str(reader_extra_args['schema_fields']))

    with make_reader(dataset_url,
                     num_epochs=None,
                     reader_pool_type=str(pool_type), workers_count=loaders_count, pyarrow_serialize=pyarrow_serialize,
                     **reader_extra_args) as reader:

        if read_method == ReadMethod.PYTHON:
            result = _time_warmup_and_work(reader, warmup_cycles_count, measure_cycles_count)
        elif read_method == ReadMethod.TF:
            result = _time_warmup_and_work_tf(reader, warmup_cycles_count, measure_cycles_count,
                                              shuffling_queue_size, min_after_dequeue)
        else:
            raise RuntimeError('Unexpected reader_type value: %s', str(read_method))

    return result