in libs/libcommon/src/libcommon/parquet_utils.py [0:0]
def query(self, offset: int, length: int) -> pa.Table:
"""Query the parquet files
Note that this implementation will always read at least one row group, to get the list of columns and always
have the same schema, even if the requested rows are invalid (out of range).
Args:
offset (`int`): The first row to read.
length (`int`): The number of rows to read.
Raises:
[`TooBigRows`]: if the arrow data from the parquet row groups is bigger than max_arrow_data_in_memory
Returns:
`pa.Table`: The requested rows.
"""
with StepProfiler(
method="parquet_index_with_metadata.query", step="get the parquet files than contain the requested rows"
):
parquet_file_offsets = np.cumsum(self.num_rows)
last_row_in_parquet = parquet_file_offsets[-1] - 1
first_row = min(offset, last_row_in_parquet)
last_row = min(offset + length - 1, last_row_in_parquet)
first_parquet_file_id, last_parquet_file_id = np.searchsorted(
parquet_file_offsets, [first_row, last_row], side="right"
)
parquet_offset = (
offset - parquet_file_offsets[first_parquet_file_id - 1] if first_parquet_file_id > 0 else offset
)
urls = self.parquet_files_urls[first_parquet_file_id : last_parquet_file_id + 1] # noqa: E203
metadata_paths = self.metadata_paths[first_parquet_file_id : last_parquet_file_id + 1] # noqa: E203
num_bytes = self.num_bytes[first_parquet_file_id : last_parquet_file_id + 1] # noqa: E203
with StepProfiler(
method="parquet_index_with_metadata.query", step="load the remote parquet files using metadata from disk"
):
parquet_files = [
pq.ParquetFile(
HTTPFile(
self.httpfs,
url,
session=self.httpfs_session,
size=size,
loop=self.httpfs.loop,
cache_type=None,
**self.httpfs.kwargs,
),
metadata=pq.read_metadata(metadata_path),
pre_buffer=True,
)
for url, metadata_path, size in zip(urls, metadata_paths, num_bytes)
]
with StepProfiler(
method="parquet_index_with_metadata.query", step="get the row groups than contain the requested rows"
):
row_group_offsets = np.cumsum(
[
parquet_file.metadata.row_group(group_id).num_rows
for parquet_file in parquet_files
for group_id in range(parquet_file.metadata.num_row_groups)
]
)
row_group_readers = [
RowGroupReader(parquet_file=parquet_file, group_id=group_id, features=self.features)
for parquet_file in parquet_files
for group_id in range(parquet_file.metadata.num_row_groups)
]
if len(row_group_offsets) == 0 or row_group_offsets[-1] == 0: # if the dataset is empty
if offset < 0:
raise IndexError("Offset must be non-negative")
return cast_table_to_schema(parquet_files[0].read(), self.features.arrow_schema)
last_row_in_parquet = row_group_offsets[-1] - 1
first_row = min(parquet_offset, last_row_in_parquet)
last_row = min(parquet_offset + length - 1, last_row_in_parquet)
first_row_group_id, last_row_group_id = np.searchsorted(
row_group_offsets, [first_row, last_row], side="right"
)
with StepProfiler(
method="parquet_index_with_metadata.row_groups_size_check", step="check if the rows can fit in memory"
):
row_groups_size = sum(
[row_group_readers[i].read_size() for i in range(first_row_group_id, last_row_group_id + 1)]
)
if row_groups_size > self.max_arrow_data_in_memory:
raise TooBigRows(
"Rows from parquet row groups are too big to be read:"
f" {size_str(row_groups_size)} (max={size_str(self.max_arrow_data_in_memory)})"
)
with StepProfiler(method="parquet_index_with_metadata.query", step="read the row groups"):
try:
pa_table = pa.concat_tables(
[
row_group_readers[i].read(self.supported_columns)
for i in range(first_row_group_id, last_row_group_id + 1)
]
)
except ArrowInvalid as err:
raise SchemaMismatchError("Parquet files have different schema.", err)
first_row_in_pa_table = row_group_offsets[first_row_group_id - 1] if first_row_group_id > 0 else 0
return pa_table.slice(parquet_offset - first_row_in_pa_table, length)