in awswrangler/distributed/ray/datasources/arrow_parquet_datasource.py [0:0]
def _sample_fragments(self) -> list[_SampleInfo]:
# Sample a few rows from Parquet files to estimate the encoding ratio.
# Launch tasks to sample multiple files remotely in parallel.
# Evenly distributed to sample N rows in i-th row group in i-th file.
# TODO(ekl/cheng) take into account column pruning.
num_files = len(self._pq_fragments)
num_samples = int(num_files * PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIO)
min_num_samples = min(PARQUET_ENCODING_RATIO_ESTIMATE_MIN_NUM_SAMPLES, num_files)
max_num_samples = min(PARQUET_ENCODING_RATIO_ESTIMATE_MAX_NUM_SAMPLES, num_files)
num_samples = max(min(num_samples, max_num_samples), min_num_samples)
# Evenly distributed to choose which file to sample, to avoid biased prediction
# if data is skewed.
file_samples = [
self._pq_fragments[idx] for idx in np.linspace(0, num_files - 1, num_samples).astype(int).tolist()
]
sample_fragment = cached_remote_fn(_sample_fragment)
futures = []
scheduling = self._local_scheduling or "SPREAD"
for sample in file_samples:
# Sample the first rows batch in i-th file.
# Use SPREAD scheduling strategy to avoid packing many sampling tasks on
# same machine to cause OOM issue, as sampling can be memory-intensive.
futures.append(
sample_fragment.options(scheduling_strategy=scheduling).remote(
self._columns,
self._schema,
sample,
)
)
sample_bar = ProgressBar(name="Parquet Files Sample", total=len(futures), unit="file samples")
sample_infos = sample_bar.fetch_until_complete(futures)
sample_bar.close() # type: ignore[no-untyped-call]
return sample_infos