def query_truncated_binary()

in libs/libcommon/src/libcommon/parquet_utils.py [0:0]


    def query_truncated_binary(self, offset: int, length: int) -> tuple[pa.Table, list[str]]:
        """Query the parquet files

        Note that this implementation will always read at least one row group, to get the list of columns and always
        have the same schema, even if the requested rows are invalid (out of range).

        This is the same as query() except that:

        - it computes a maximum size to allocate to binary data in step "parquet_index_with_metadata.row_groups_size_check_truncated_binary"
        - it uses `read_truncated_binary()` in step "parquet_index_with_metadata.query_truncated_binary".

        Args:
            offset (`int`): The first row to read.
            length (`int`): The number of rows to read.

        Raises:
            [`TooBigRows`]: if the arrow data from the parquet row groups is bigger than max_arrow_data_in_memory

        Returns:
            `pa.Table`: The requested rows.
            `list[strl]: List of truncated columns.
        """
        all_columns = set(self.features)
        binary_columns = set(column for column, feature in self.features.items() if feature == Value("binary"))
        if not binary_columns:
            return self.query(offset=offset, length=length), []
        with StepProfiler(
            method="parquet_index_with_metadata.query", step="get the parquet files than contain the requested rows"
        ):
            parquet_file_offsets = np.cumsum(self.num_rows)

            last_row_in_parquet = parquet_file_offsets[-1] - 1
            first_row = min(offset, last_row_in_parquet)
            last_row = min(offset + length - 1, last_row_in_parquet)
            first_parquet_file_id, last_parquet_file_id = np.searchsorted(
                parquet_file_offsets, [first_row, last_row], side="right"
            )
            parquet_offset = (
                offset - parquet_file_offsets[first_parquet_file_id - 1] if first_parquet_file_id > 0 else offset
            )
            urls = self.parquet_files_urls[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203
            metadata_paths = self.metadata_paths[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203
            num_bytes = self.num_bytes[first_parquet_file_id : last_parquet_file_id + 1]  # noqa: E203

        with StepProfiler(
            method="parquet_index_with_metadata.query", step="load the remote parquet files using metadata from disk"
        ):
            parquet_files = [
                pq.ParquetFile(
                    HTTPFile(
                        self.httpfs,
                        url,
                        session=self.httpfs_session,
                        size=size,
                        loop=self.httpfs.loop,
                        cache_type=None,
                        **self.httpfs.kwargs,
                    ),
                    metadata=pq.read_metadata(metadata_path),
                    pre_buffer=True,
                )
                for url, metadata_path, size in zip(urls, metadata_paths, num_bytes)
            ]

        with StepProfiler(
            method="parquet_index_with_metadata.query", step="get the row groups than contain the requested rows"
        ):
            row_group_offsets = np.cumsum(
                [
                    parquet_file.metadata.row_group(group_id).num_rows
                    for parquet_file in parquet_files
                    for group_id in range(parquet_file.metadata.num_row_groups)
                ]
            )
            row_group_readers = [
                RowGroupReader(parquet_file=parquet_file, group_id=group_id, features=self.features)
                for parquet_file in parquet_files
                for group_id in range(parquet_file.metadata.num_row_groups)
            ]

            if len(row_group_offsets) == 0 or row_group_offsets[-1] == 0:  # if the dataset is empty
                if offset < 0:
                    raise IndexError("Offset must be non-negative")
                return cast_table_to_schema(parquet_files[0].read(), self.features.arrow_schema), []

            last_row_in_parquet = row_group_offsets[-1] - 1
            first_row = min(parquet_offset, last_row_in_parquet)
            last_row = min(parquet_offset + length - 1, last_row_in_parquet)

            first_row_group_id, last_row_group_id = np.searchsorted(
                row_group_offsets, [first_row, last_row], side="right"
            )

        with StepProfiler(
            method="parquet_index_with_metadata.row_groups_size_check_truncated_binary",
            step="check if the rows can fit in memory",
        ):
            in_memory_max_non_binary_size = sum(
                [
                    row_group_readers[i].read_size(columns=all_columns - binary_columns)
                    for i in range(first_row_group_id, last_row_group_id + 1)
                ]
            )
            in_memory_max_binary_size = max(
                [
                    row_group_readers[i].read_size(columns=binary_columns)
                    for i in range(first_row_group_id, last_row_group_id + 1)
                ]
            )
            in_memory_max_size = in_memory_max_non_binary_size + in_memory_max_binary_size
            if in_memory_max_size > self.max_arrow_data_in_memory:
                raise TooBigRows(
                    "Rows from parquet row groups are too big to be read:"
                    f" {size_str(in_memory_max_size)} (max={size_str(self.max_arrow_data_in_memory)})"
                )

        with StepProfiler(method="parquet_index_with_metadata.query_truncated_binary", step="read the row groups"):
            # This is a simple heuristic of how much we need to truncate binary data
            max_binary_length = max(
                int(
                    (self.max_arrow_data_in_memory - in_memory_max_non_binary_size)
                    / (last_row_group_id + 1 - first_row_group_id)
                    / len(binary_columns)
                    / 2  # we divide more in case the row groups are not evenly distributed
                ),
                20,
            )  # we use a minimum length to not end up with too empty cells
            try:
                pa_tables: list[pa.Table] = []
                truncated_columns: set[str] = set()
                for i in range(first_row_group_id, last_row_group_id + 1):
                    rg_pa_table, rg_truncated_columns = row_group_readers[i].read_truncated_binary(
                        self.supported_columns, max_binary_length=max_binary_length
                    )
                    pa_tables.append(rg_pa_table)
                    truncated_columns |= set(rg_truncated_columns)
                pa_table = pa.concat_tables(pa_tables)
            except ArrowInvalid as err:
                raise SchemaMismatchError("Parquet files have different schema.", err)
            first_row_in_pa_table = row_group_offsets[first_row_group_id - 1] if first_row_group_id > 0 else 0
            return pa_table.slice(parquet_offset - first_row_in_pa_table, length), list(truncated_columns)