in smallpond/execution/task.py [0:0]
def finalize(self):
self.inject_fault()
assert self.status == WorkStatus.SUCCEED
logger.info("finished task: {}", self)
# move the task output from staging dir to output dir
if self.runtime_output_abspath != self.final_output_abspath and os.path.exists(self.runtime_output_abspath):
os.makedirs(os.path.dirname(self.final_output_abspath), exist_ok=True)
os.rename(self.runtime_output_abspath, self.final_output_abspath)
def collect_file_sizes(file_paths):
if not file_paths:
return []
try:
with ThreadPoolExecutor(min(32, len(file_paths))) as pool:
file_sizes = list(pool.map(os.path.getsize, file_paths))
except FileNotFoundError:
logger.warning(f"some of the output files not found: {file_paths[:3]}...")
file_sizes = []
return file_sizes
if self.ctx.enable_diagnostic_metrics:
input_file_paths = [path for dataset in self.input_datasets for path in dataset.resolved_paths]
output_file_paths = self.output.resolved_paths
for metric_name, file_paths in [
("input", input_file_paths),
("output", output_file_paths),
]:
file_sizes = collect_file_sizes(file_paths)
if file_paths and file_sizes:
self.perf_metrics[f"num {metric_name} files"] += len(file_paths)
self.perf_metrics[f"total {metric_name} size (MB)"] += sum(file_sizes) / MB
self.perf_metrics["elapsed wall time (secs)"] += self.elapsed_time
if not self.exec_on_scheduler:
resource_usage = resource.getrusage(resource.RUSAGE_SELF)
self.perf_metrics["max resident set size (MB)"] += resource_usage.ru_maxrss / 1024
self.perf_metrics["user mode cpu time (secs)"] += resource_usage.ru_utime
self.perf_metrics["system mode cpu time (secs)"] += resource_usage.ru_stime
logger.debug(f"{self.key} perf metrics:{os.linesep}{os.linesep.join(f'{name}: {value}' for name, value in self.perf_metrics.items())}")
if self.perf_profile is not None and self.elapsed_time > 3:
logger.debug(f"{self.key} perf profile:{os.linesep}{cprofile_to_string(self.perf_profile)}")