in smallpond/logical/dataset.py [0:0]
def load_partitioned_datasets(self, npartition: int, data_partition_column: str, hive_partitioning=False) -> "List[ParquetDataSet]":
"""
Split the dataset into a list of partitioned datasets.
"""
assert npartition > 0, f"npartition has negative value: {npartition}"
if npartition > self.num_files:
logger.debug(f"number of partitions {npartition} is greater than the number of files {self.num_files}")
file_partitions: List[ParquetDataSet] = self._init_file_partitions(npartition)
for dataset in file_partitions:
# elements will be appended later
dataset._absolute_paths = []
dataset._resolved_paths = []
dataset._resolved_row_ranges = []
if not self.resolved_paths:
logger.debug(f"create {npartition} empty data partitions from {self}")
return file_partitions
with ThreadPoolExecutor(min(32, len(self.resolved_paths))) as pool:
partition_keys = pool.map(
lambda path: ParquetDataSet._read_partition_key(path, data_partition_column, hive_partitioning),
self.resolved_paths,
)
for row_range, partition_key in zip(self.resolved_row_ranges, partition_keys):
if partition_key is not None:
assert 0 <= partition_key <= npartition, f"invalid partition key {partition_key} found in {row_range.path}"
dataset = file_partitions[partition_key]
dataset.paths.append(row_range.path)
dataset._absolute_paths.append(row_range.path)
dataset._resolved_paths.append(row_range.path)
dataset._resolved_row_ranges.append(row_range)
logger.debug(f"loaded {npartition} data partitions from {self}")
return file_partitions