in smallpond/execution/task.py [0:0]
def finalize(self):
assert (
len(self.partitioned_datasets) == self.npartitions
), f"number of partitions {len(self.partitioned_datasets)} not equal to {self.npartitions}"
is_empty_partition = [dataset.empty for dataset in self.partitioned_datasets]
if all(is_empty_partition):
for dataset in self.partitioned_datasets:
dataset.paths.clear()
else:
# Create an empty file for each empty partition.
# This is to ensure that partition consumers have at least one file to read.
empty_partitions = [idx for idx, empty in enumerate(is_empty_partition) if empty]
nonempty_partitions = [idx for idx, empty in enumerate(is_empty_partition) if not empty]
first_nonempty_dataset = self.partitioned_datasets[nonempty_partitions[0]]
if empty_partitions:
with ThreadPoolExecutor(self.cpu_limit) as pool:
empty_file_paths = list(
pool.map(
lambda idx: self._create_empty_file(idx, first_nonempty_dataset),
empty_partitions,
)
)
for partition_idx, empty_file_path in zip(empty_partitions, empty_file_paths):
self.partitioned_datasets[partition_idx].reset([empty_file_path], self.runtime_output_abspath)
logger.debug(f"created empty output files in partitions {empty_partitions} of {repr(self)}: {empty_file_paths[:3]}...")
# reset root_dir from runtime_output_abspath to final_output_abspath
for dataset in self.partitioned_datasets:
# XXX: if the task has output in `runtime_output_abspath`,
# `root_dir` must be set and all row ranges must be full ranges.
if dataset.root_dir == self.runtime_output_abspath:
dataset.reset(dataset.paths, self.final_output_abspath, dataset.recursive)
# XXX: otherwise, we assume there is no output in `runtime_output_abspath`.
# do nothing to the dataset.
self.dataset = PartitionedDataSet(self.partitioned_datasets)
super().finalize()