def query()

in libs/libcommon/src/libcommon/parquet_utils.py [0:0]


    def query(self, offset: int, length: int) -> pa.Table:
        """Query the parquet files

        Note that this implementation will always read at least one row group, to get the list of columns and always
        have the same schema, even if the requested rows are invalid (out of range).

        Args:
            offset (`int`): The first row to read.
            length (`int`): The number of rows to read.

        Raises:
            [`TooBigRows`]: if the arrow data from the parquet row groups is bigger than max_arrow_data_in_memory

        Returns:
            `pa.Table`: The requested rows.
        """
        with StepProfiler(
            method="parquet_index_with_metadata.query", step="get the parquet files than contain the requested rows"
        ):
            parquet_file_offsets = np.cumsum(self.num_rows)

            last_row_in_parquet = parquet_file_offsets[-1] - 1
            first_row = min(offset, last_row_in_parquet)
            last_row = min(offset + length - 1, last_row_in_parquet)
            first_parquet_file_id, last_parquet_file_id = np.searchsorted(
                parquet_file_offsets, [first_row, last_row], side="right"
            )
            parquet_offset = (
                offset - parquet_file_offsets[first_parquet_file_id - 1] if first_parquet_file_id > 0 else offset
            )
            urls = self.parquet_files_urls[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203
            metadata_paths = self.metadata_paths[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203
            num_bytes = self.num_bytes[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203

        with StepProfiler(
            method="parquet_index_with_metadata.query", step="load the remote parquet files using metadata from disk"
        ):
            parquet_files = [
                pq.ParquetFile(
                    HTTPFile(
                        self.httpfs,
                        url,
                        session=self.httpfs_session,
                        size=size,
                        loop=self.httpfs.loop,
                        cache_type=None,
                        **self.httpfs.kwargs,
                    ),
                    metadata=pq.read_metadata(metadata_path),
                    pre_buffer=True,
                )
                for url, metadata_path, size in zip(urls, metadata_paths, num_bytes)
            ]

        with StepProfiler(
            method="parquet_index_with_metadata.query", step="get the row groups than contain the requested rows"
        ):
            row_group_offsets = np.cumsum(
                [
                    parquet_file.metadata.row_group(group_id).num_rows
                    for parquet_file in parquet_files
                    for group_id in range(parquet_file.metadata.num_row_groups)
                ]
            )
            row_group_readers = [
                RowGroupReader(parquet_file=parquet_file, group_id=group_id, features=self.features)
                for parquet_file in parquet_files
                for group_id in range(parquet_file.metadata.num_row_groups)
            ]

            if len(row_group_offsets) == 0 or row_group_offsets[-1] == 0:  # if the dataset is empty
                if offset < 0:
                    raise IndexError("Offset must be non-negative")
                return cast_table_to_schema(parquet_files[0].read(), self.features.arrow_schema)

            last_row_in_parquet = row_group_offsets[-1] - 1
            first_row = min(parquet_offset, last_row_in_parquet)
            last_row = min(parquet_offset + length - 1, last_row_in_parquet)

            first_row_group_id, last_row_group_id = np.searchsorted(
                row_group_offsets, [first_row, last_row], side="right"
            )

        with StepProfiler(
            method="parquet_index_with_metadata.row_groups_size_check", step="check if the rows can fit in memory"
        ):
            row_groups_size = sum(
                [row_group_readers[i].read_size() for i in range(first_row_group_id, last_row_group_id + 1)]
            )
            if row_groups_size > self.max_arrow_data_in_memory:
                raise TooBigRows(
                    "Rows from parquet row groups are too big to be read:"
                    f" {size_str(row_groups_size)} (max={size_str(self.max_arrow_data_in_memory)})"
                )

        with StepProfiler(method="parquet_index_with_metadata.query", step="read the row groups"):
            try:
                pa_table = pa.concat_tables(
                    [
                        row_group_readers[i].read(self.supported_columns)
                        for i in range(first_row_group_id, last_row_group_id + 1)
                    ]
                )
            except ArrowInvalid as err:
                raise SchemaMismatchError("Parquet files have different schema.", err)
            first_row_in_pa_table = row_group_offsets[first_row_group_id - 1] if first_row_group_id > 0 else 0
            return pa_table.slice(parquet_offset - first_row_in_pa_table, length)