def _read_metadata()

in src/datasets/packaged_modules/folder_based_builder/folder_based_builder.py [0:0]


    def _read_metadata(self, metadata_file: str, metadata_ext: str = "") -> Iterator[pa.Table]:
        """using the same logic as the Csv, Json and Parquet dataset builders to stream the data"""
        if self.config.filters is not None:
            filter_expr = (
                pq.filters_to_expression(self.config.filters)
                if isinstance(self.config.filters, list)
                else self.config.filters
            )
        else:
            filter_expr = None
        if metadata_ext == ".csv":
            chunksize = 10_000  # 10k lines
            schema = self.config.features.arrow_schema if self.config.features else None
            # dtype allows reading an int column as str
            dtype = (
                {
                    name: dtype.to_pandas_dtype() if not require_storage_cast(feature) else object
                    for name, dtype, feature in zip(schema.names, schema.types, self.config.features.values())
                }
                if schema is not None
                else None
            )
            csv_file_reader = pd.read_csv(metadata_file, iterator=True, dtype=dtype, chunksize=chunksize)
            for df in csv_file_reader:
                pa_table = pa.Table.from_pandas(df)
                if self.config.filters is not None:
                    pa_table = pa_table.filter(filter_expr)
                if len(pa_table) > 0:
                    yield pa_table
        elif metadata_ext == ".jsonl":
            with open(metadata_file, "rb") as f:
                chunksize: int = 10 << 20  # 10MB
                # Use block_size equal to the chunk size divided by 32 to leverage multithreading
                # Set a default minimum value of 16kB if the chunk size is really small
                block_size = max(chunksize // 32, 16 << 10)
                while True:
                    batch = f.read(chunksize)
                    if not batch:
                        break
                    # Finish current line
                    try:
                        batch += f.readline()
                    except (AttributeError, io.UnsupportedOperation):
                        batch += readline(f)
                    while True:
                        try:
                            pa_table = paj.read_json(
                                io.BytesIO(batch), read_options=paj.ReadOptions(block_size=block_size)
                            )
                            break
                        except (pa.ArrowInvalid, pa.ArrowNotImplementedError) as e:
                            if (
                                isinstance(e, pa.ArrowInvalid)
                                and "straddling" not in str(e)
                                or block_size > len(batch)
                            ):
                                raise
                            else:
                                # Increase the block size in case it was too small.
                                # The block size will be reset for the next file.
                                logger.debug(
                                    f"Batch of {len(batch)} bytes couldn't be parsed with block_size={block_size}. Retrying with block_size={block_size * 2}."
                                )
                                block_size *= 2
                    if self.config.filters is not None:
                        pa_table = pa_table.filter(filter_expr)
                    if len(pa_table) > 0:
                        yield pa_table
        else:
            with open(metadata_file, "rb") as f:
                parquet_fragment = ds.ParquetFileFormat().make_fragment(f)
                if parquet_fragment.row_groups:
                    batch_size = parquet_fragment.row_groups[0].num_rows
                else:
                    batch_size = config.DEFAULT_MAX_BATCH_SIZE
                for record_batch in parquet_fragment.to_batches(
                    batch_size=batch_size,
                    filter=filter_expr,
                    batch_readahead=0,
                    fragment_readahead=0,
                ):
                    yield pa.Table.from_batches([record_batch])