def get_read_tasks()

in awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py [0:0]


    def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
        """Override the base class FileBasedDatasource.get_read_tasks().

        Required in order to leverage pyarrow's ParquetDataset abstraction,
        which simplifies partitioning logic.
        """
        pq_metadata = self._metadata
        if len(pq_metadata) < len(self._pq_fragments):
            # Pad `pq_metadata` to be same length of `self._pq_fragments`.
            # This can happen when no file metadata being prefetched.
            pq_metadata += [None] * (len(self._pq_fragments) - len(pq_metadata))  # type: ignore[list-item]

        if self._file_metadata_shuffler is not None:
            files_metadata = list(zip(self._pq_fragments, self._pq_paths, pq_metadata))
            shuffled_files_metadata = [
                files_metadata[i] for i in self._file_metadata_shuffler.permutation(len(files_metadata))
            ]
            pq_fragments, pq_paths, pq_metadata = list(map(list, zip(*shuffled_files_metadata)))
        else:
            pq_fragments, pq_paths = (
                self._pq_fragments,
                self._pq_paths,
            )

        read_tasks = []
        for fragments, paths, metadata in zip(  # type: ignore[var-annotated]
            np.array_split(pq_fragments, parallelism),
            np.array_split(pq_paths, parallelism),
            np.array_split(pq_metadata, parallelism),  # type: ignore[arg-type]
        ):
            if len(fragments) <= 0:
                continue

            meta = self._meta_provider(
                paths,  # type: ignore[arg-type]
                self._inferred_schema,
                num_fragments=len(fragments),
                prefetched_metadata=metadata,
            )
            # If there is a filter operation, reset the calculated row count,
            # since the resulting row count is unknown.
            if self._arrow_parquet_args.get("filter") is not None:
                meta.num_rows = None

            if meta.size_bytes is not None:
                meta.size_bytes = int(meta.size_bytes * self._encoding_ratio)

            (
                block_udf,
                arrow_parquet_args,
                default_read_batch_size_rows,
                columns,
                schema,
                path_root,
                include_paths,
            ) = (
                self._block_udf,
                self._arrow_parquet_args,
                self._default_read_batch_size_rows,
                self._columns,
                self._schema,
                self._path_root,
                self._include_paths,
            )
            read_tasks.append(
                ReadTask(
                    lambda f=fragments: _read_fragments(  # type: ignore[misc]
                        block_udf,
                        arrow_parquet_args,
                        default_read_batch_size_rows,
                        columns,
                        schema,
                        path_root,
                        f,
                        include_paths,
                    ),
                    meta,
                )
            )

        return read_tasks