in smallpond/execution/task.py [0:0]
def collect_output_files(self, pool: ThreadPoolExecutor) -> bool:
final_output_dir = PurePath(self.final_output_abspath)
runtime_output_dir = Path(self.runtime_output_abspath)
dst_mount_point = find_mount_point(self.runtime_output_abspath)
sink_type = self.type
src_paths = [
p
for paths in pool.map(
lambda dataset: [Path(path) for path in dataset.resolved_paths],
self.input_datasets,
)
for p in paths
]
logger.info(f"collected {len(src_paths)} files from {len(self.input_datasets)} input datasets")
if len(set(p.name for p in src_paths)) == len(src_paths):
dst_paths = [runtime_output_dir / p.name for p in src_paths]
else:
logger.warning(f"found duplicate filenames, appending index to filename...")
dst_paths = [runtime_output_dir / f"{p.stem}.{idx}{p.suffix}" for idx, p in enumerate(src_paths)]
output_paths = src_paths if sink_type == "manifest" else [final_output_dir / p.name for p in dst_paths]
self.dataset = ParquetDataSet([str(p) for p in output_paths]) # FIXME: what if the dataset is not parquet?
def copy_file(src_path: Path, dst_path: Path):
# XXX: DO NOT use shutil.{copy, copy2, copyfileobj}
# they use sendfile on Linux. although they set blocksize=8M, the actual io size is fixed to 64k, resulting in low throughput.
with open(src_path, "rb") as src_file, open(dst_path, "wb") as dst_file:
shutil.copyfileobj(src_file, dst_file, length=16 * MB)
def create_link_or_copy(src_path: Path, dst_path: Path):
if dst_path.exists():
logger.warning(f"destination path already exists, replacing {dst_path} with {src_path}")
dst_path.unlink(missing_ok=True)
same_mount_point = str(src_path).startswith(dst_mount_point)
if sink_type == "copy":
copy_file(src_path, dst_path)
elif sink_type == "link_manifest":
if same_mount_point:
os.link(src_path, dst_path)
else:
dst_path.symlink_to(src_path)
elif sink_type == "link_or_copy":
if same_mount_point:
os.link(src_path, dst_path)
else:
copy_file(src_path, dst_path)
else:
raise RuntimeError(f"invalid sink type: {sink_type}")
return True
if sink_type in ("copy", "link_or_copy", "link_manifest"):
if src_paths:
assert all(pool.map(create_link_or_copy, src_paths, dst_paths))
else:
logger.warning(f"input of data sink is empty: {self}")
if sink_type == "manifest" or sink_type == "link_manifest":
# write to a temporary file and rename it atomically
manifest_path = final_output_dir / self.manifest_filename
manifest_tmp_path = runtime_output_dir / f"{self.manifest_filename}.tmp"
with open(manifest_tmp_path, "w", buffering=2 * MB) as manifest_file:
for path in output_paths:
print(str(path), file=manifest_file)
manifest_tmp_path.rename(manifest_path)
logger.info(f"created a manifest file at {manifest_path}")
if sink_type == "link_manifest":
# remove the staging directory
remove_path(self.staging_root)
# check the output parquet files
# if any file is broken, an exception will be raised
if len(dst_paths) > 0 and dst_paths[0].suffix == ".parquet":
logger.info(f"checked dataset files and found {self.dataset.num_rows} rows")
return True