def load_partitioned_datasets()

in smallpond/logical/dataset.py [0:0]


    def load_partitioned_datasets(self, npartition: int, data_partition_column: str, hive_partitioning=False) -> "List[ParquetDataSet]":
        """
        Split the dataset into a list of partitioned datasets.
        """
        assert npartition > 0, f"npartition has negative value: {npartition}"
        if npartition > self.num_files:
            logger.debug(f"number of partitions {npartition} is greater than the number of files {self.num_files}")

        file_partitions: List[ParquetDataSet] = self._init_file_partitions(npartition)
        for dataset in file_partitions:
            # elements will be appended later
            dataset._absolute_paths = []
            dataset._resolved_paths = []
            dataset._resolved_row_ranges = []

        if not self.resolved_paths:
            logger.debug(f"create {npartition} empty data partitions from {self}")
            return file_partitions

        with ThreadPoolExecutor(min(32, len(self.resolved_paths))) as pool:
            partition_keys = pool.map(
                lambda path: ParquetDataSet._read_partition_key(path, data_partition_column, hive_partitioning),
                self.resolved_paths,
            )

        for row_range, partition_key in zip(self.resolved_row_ranges, partition_keys):
            if partition_key is not None:
                assert 0 <= partition_key <= npartition, f"invalid partition key {partition_key} found in {row_range.path}"
                dataset = file_partitions[partition_key]
                dataset.paths.append(row_range.path)
                dataset._absolute_paths.append(row_range.path)
                dataset._resolved_paths.append(row_range.path)
                dataset._resolved_row_ranges.append(row_range)

        logger.debug(f"loaded {npartition} data partitions from {self}")
        return file_partitions