def collect_output_files()

in smallpond/execution/task.py [0:0]


    def collect_output_files(self, pool: ThreadPoolExecutor) -> bool:
        final_output_dir = PurePath(self.final_output_abspath)
        runtime_output_dir = Path(self.runtime_output_abspath)
        dst_mount_point = find_mount_point(self.runtime_output_abspath)
        sink_type = self.type

        src_paths = [
            p
            for paths in pool.map(
                lambda dataset: [Path(path) for path in dataset.resolved_paths],
                self.input_datasets,
            )
            for p in paths
        ]
        logger.info(f"collected {len(src_paths)} files from {len(self.input_datasets)} input datasets")

        if len(set(p.name for p in src_paths)) == len(src_paths):
            dst_paths = [runtime_output_dir / p.name for p in src_paths]
        else:
            logger.warning(f"found duplicate filenames, appending index to filename...")
            dst_paths = [runtime_output_dir / f"{p.stem}.{idx}{p.suffix}" for idx, p in enumerate(src_paths)]

        output_paths = src_paths if sink_type == "manifest" else [final_output_dir / p.name for p in dst_paths]
        self.dataset = ParquetDataSet([str(p) for p in output_paths])  # FIXME: what if the dataset is not parquet?

        def copy_file(src_path: Path, dst_path: Path):
            # XXX: DO NOT use shutil.{copy, copy2, copyfileobj}
            #   they use sendfile on Linux. although they set blocksize=8M, the actual io size is fixed to 64k, resulting in low throughput.
            with open(src_path, "rb") as src_file, open(dst_path, "wb") as dst_file:
                shutil.copyfileobj(src_file, dst_file, length=16 * MB)

        def create_link_or_copy(src_path: Path, dst_path: Path):
            if dst_path.exists():
                logger.warning(f"destination path already exists, replacing {dst_path} with {src_path}")
                dst_path.unlink(missing_ok=True)
            same_mount_point = str(src_path).startswith(dst_mount_point)
            if sink_type == "copy":
                copy_file(src_path, dst_path)
            elif sink_type == "link_manifest":
                if same_mount_point:
                    os.link(src_path, dst_path)
                else:
                    dst_path.symlink_to(src_path)
            elif sink_type == "link_or_copy":
                if same_mount_point:
                    os.link(src_path, dst_path)
                else:
                    copy_file(src_path, dst_path)
            else:
                raise RuntimeError(f"invalid sink type: {sink_type}")
            return True

        if sink_type in ("copy", "link_or_copy", "link_manifest"):
            if src_paths:
                assert all(pool.map(create_link_or_copy, src_paths, dst_paths))
            else:
                logger.warning(f"input of data sink is empty: {self}")

        if sink_type == "manifest" or sink_type == "link_manifest":
            # write to a temporary file and rename it atomically
            manifest_path = final_output_dir / self.manifest_filename
            manifest_tmp_path = runtime_output_dir / f"{self.manifest_filename}.tmp"
            with open(manifest_tmp_path, "w", buffering=2 * MB) as manifest_file:
                for path in output_paths:
                    print(str(path), file=manifest_file)
            manifest_tmp_path.rename(manifest_path)
            logger.info(f"created a manifest file at {manifest_path}")

        if sink_type == "link_manifest":
            # remove the staging directory
            remove_path(self.staging_root)

            # check the output parquet files
            # if any file is broken, an exception will be raised
            if len(dst_paths) > 0 and dst_paths[0].suffix == ".parquet":
                logger.info(f"checked dataset files and found {self.dataset.num_rows} rows")

        return True