in awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py [0:0]
def get_read_tasks(self, parallelism: int) -> list[ReadTask]:
"""Override the base class FileBasedDatasource.get_read_tasks().
Required in order to leverage pyarrow's ParquetDataset abstraction,
which simplifies partitioning logic.
"""
pq_metadata = self._metadata
if len(pq_metadata) < len(self._pq_fragments):
# Pad `pq_metadata` to be same length of `self._pq_fragments`.
# This can happen when no file metadata being prefetched.
pq_metadata += [None] * (len(self._pq_fragments) - len(pq_metadata)) # type: ignore[list-item]
if self._file_metadata_shuffler is not None:
files_metadata = list(zip(self._pq_fragments, self._pq_paths, pq_metadata))
shuffled_files_metadata = [
files_metadata[i] for i in self._file_metadata_shuffler.permutation(len(files_metadata))
]
pq_fragments, pq_paths, pq_metadata = list(map(list, zip(*shuffled_files_metadata)))
else:
pq_fragments, pq_paths = (
self._pq_fragments,
self._pq_paths,
)
read_tasks = []
for fragments, paths, metadata in zip( # type: ignore[var-annotated]
np.array_split(pq_fragments, parallelism),
np.array_split(pq_paths, parallelism),
np.array_split(pq_metadata, parallelism), # type: ignore[arg-type]
):
if len(fragments) <= 0:
continue
meta = self._meta_provider(
paths, # type: ignore[arg-type]
self._inferred_schema,
num_fragments=len(fragments),
prefetched_metadata=metadata,
)
# If there is a filter operation, reset the calculated row count,
# since the resulting row count is unknown.
if self._arrow_parquet_args.get("filter") is not None:
meta.num_rows = None
if meta.size_bytes is not None:
meta.size_bytes = int(meta.size_bytes * self._encoding_ratio)
(
block_udf,
arrow_parquet_args,
default_read_batch_size_rows,
columns,
schema,
path_root,
include_paths,
) = (
self._block_udf,
self._arrow_parquet_args,
self._default_read_batch_size_rows,
self._columns,
self._schema,
self._path_root,
self._include_paths,
)
read_tasks.append(
ReadTask(
lambda f=fragments: _read_fragments( # type: ignore[misc]
block_udf,
arrow_parquet_args,
default_read_batch_size_rows,
columns,
schema,
path_root,
f,
include_paths,
),
meta,
)
)
return read_tasks