def finalize()

in smallpond/execution/task.py [0:0]


    def finalize(self):
        self.inject_fault()
        assert self.status == WorkStatus.SUCCEED
        logger.info("finished task: {}", self)

        # move the task output from staging dir to output dir
        if self.runtime_output_abspath != self.final_output_abspath and os.path.exists(self.runtime_output_abspath):
            os.makedirs(os.path.dirname(self.final_output_abspath), exist_ok=True)
            os.rename(self.runtime_output_abspath, self.final_output_abspath)

        def collect_file_sizes(file_paths):
            if not file_paths:
                return []
            try:
                with ThreadPoolExecutor(min(32, len(file_paths))) as pool:
                    file_sizes = list(pool.map(os.path.getsize, file_paths))
            except FileNotFoundError:
                logger.warning(f"some of the output files not found: {file_paths[:3]}...")
                file_sizes = []
            return file_sizes

        if self.ctx.enable_diagnostic_metrics:
            input_file_paths = [path for dataset in self.input_datasets for path in dataset.resolved_paths]
            output_file_paths = self.output.resolved_paths
            for metric_name, file_paths in [
                ("input", input_file_paths),
                ("output", output_file_paths),
            ]:
                file_sizes = collect_file_sizes(file_paths)
                if file_paths and file_sizes:
                    self.perf_metrics[f"num {metric_name} files"] += len(file_paths)
                    self.perf_metrics[f"total {metric_name} size (MB)"] += sum(file_sizes) / MB

        self.perf_metrics["elapsed wall time (secs)"] += self.elapsed_time
        if not self.exec_on_scheduler:
            resource_usage = resource.getrusage(resource.RUSAGE_SELF)
            self.perf_metrics["max resident set size (MB)"] += resource_usage.ru_maxrss / 1024
            self.perf_metrics["user mode cpu time (secs)"] += resource_usage.ru_utime
            self.perf_metrics["system mode cpu time (secs)"] += resource_usage.ru_stime
            logger.debug(f"{self.key} perf metrics:{os.linesep}{os.linesep.join(f'{name}: {value}' for name, value in self.perf_metrics.items())}")

        if self.perf_profile is not None and self.elapsed_time > 3:
            logger.debug(f"{self.key} perf profile:{os.linesep}{cprofile_to_string(self.perf_profile)}")