def finalize()

in smallpond/execution/task.py [0:0]


    def finalize(self):
        assert (
            len(self.partitioned_datasets) == self.npartitions
        ), f"number of partitions {len(self.partitioned_datasets)} not equal to {self.npartitions}"
        is_empty_partition = [dataset.empty for dataset in self.partitioned_datasets]

        if all(is_empty_partition):
            for dataset in self.partitioned_datasets:
                dataset.paths.clear()
        else:
            # Create an empty file for each empty partition.
            # This is to ensure that partition consumers have at least one file to read.
            empty_partitions = [idx for idx, empty in enumerate(is_empty_partition) if empty]
            nonempty_partitions = [idx for idx, empty in enumerate(is_empty_partition) if not empty]
            first_nonempty_dataset = self.partitioned_datasets[nonempty_partitions[0]]
            if empty_partitions:
                with ThreadPoolExecutor(self.cpu_limit) as pool:
                    empty_file_paths = list(
                        pool.map(
                            lambda idx: self._create_empty_file(idx, first_nonempty_dataset),
                            empty_partitions,
                        )
                    )
                    for partition_idx, empty_file_path in zip(empty_partitions, empty_file_paths):
                        self.partitioned_datasets[partition_idx].reset([empty_file_path], self.runtime_output_abspath)
                    logger.debug(f"created empty output files in partitions {empty_partitions} of {repr(self)}: {empty_file_paths[:3]}...")

        # reset root_dir from runtime_output_abspath to final_output_abspath
        for dataset in self.partitioned_datasets:
            # XXX: if the task has output in `runtime_output_abspath`,
            #      `root_dir` must be set and all row ranges must be full ranges.
            if dataset.root_dir == self.runtime_output_abspath:
                dataset.reset(dataset.paths, self.final_output_abspath, dataset.recursive)
            # XXX: otherwise, we assume there is no output in `runtime_output_abspath`.
            #      do nothing to the dataset.
        self.dataset = PartitionedDataSet(self.partitioned_datasets)

        super().finalize()