in src/datasets/packaged_modules/folder_based_builder/folder_based_builder.py [0:0]
def _read_metadata(self, metadata_file: str, metadata_ext: str = "") -> Iterator[pa.Table]:
"""using the same logic as the Csv, Json and Parquet dataset builders to stream the data"""
if self.config.filters is not None:
filter_expr = (
pq.filters_to_expression(self.config.filters)
if isinstance(self.config.filters, list)
else self.config.filters
)
else:
filter_expr = None
if metadata_ext == ".csv":
chunksize = 10_000 # 10k lines
schema = self.config.features.arrow_schema if self.config.features else None
# dtype allows reading an int column as str
dtype = (
{
name: dtype.to_pandas_dtype() if not require_storage_cast(feature) else object
for name, dtype, feature in zip(schema.names, schema.types, self.config.features.values())
}
if schema is not None
else None
)
csv_file_reader = pd.read_csv(metadata_file, iterator=True, dtype=dtype, chunksize=chunksize)
for df in csv_file_reader:
pa_table = pa.Table.from_pandas(df)
if self.config.filters is not None:
pa_table = pa_table.filter(filter_expr)
if len(pa_table) > 0:
yield pa_table
elif metadata_ext == ".jsonl":
with open(metadata_file, "rb") as f:
chunksize: int = 10 << 20 # 10MB
# Use block_size equal to the chunk size divided by 32 to leverage multithreading
# Set a default minimum value of 16kB if the chunk size is really small
block_size = max(chunksize // 32, 16 << 10)
while True:
batch = f.read(chunksize)
if not batch:
break
# Finish current line
try:
batch += f.readline()
except (AttributeError, io.UnsupportedOperation):
batch += readline(f)
while True:
try:
pa_table = paj.read_json(
io.BytesIO(batch), read_options=paj.ReadOptions(block_size=block_size)
)
break
except (pa.ArrowInvalid, pa.ArrowNotImplementedError) as e:
if (
isinstance(e, pa.ArrowInvalid)
and "straddling" not in str(e)
or block_size > len(batch)
):
raise
else:
# Increase the block size in case it was too small.
# The block size will be reset for the next file.
logger.debug(
f"Batch of {len(batch)} bytes couldn't be parsed with block_size={block_size}. Retrying with block_size={block_size * 2}."
)
block_size *= 2
if self.config.filters is not None:
pa_table = pa_table.filter(filter_expr)
if len(pa_table) > 0:
yield pa_table
else:
with open(metadata_file, "rb") as f:
parquet_fragment = ds.ParquetFileFormat().make_fragment(f)
if parquet_fragment.row_groups:
batch_size = parquet_fragment.row_groups[0].num_rows
else:
batch_size = config.DEFAULT_MAX_BATCH_SIZE
for record_batch in parquet_fragment.to_batches(
batch_size=batch_size,
filter=filter_expr,
batch_readahead=0,
fragment_readahead=0,
):
yield pa.Table.from_batches([record_batch])