in smallpond/execution/scheduler.py [0:0]
def export_task_metrics(self):
import pyarrow as arrow
import pyarrow.csv as csv
def pristine_attrs_dict(task: Task):
return {
key: str(val) if isinstance(val, Enum) else val
for key in task._pristine_attrs
if isinstance(
val := getattr(task, key),
(bool, str, int, float, Enum, np.integer, np.floating),
)
}
dump(
self.finished_tasks,
os.path.join(self.ctx.config_root, "finished_tasks.pickle"),
buffering=32 * MB,
)
dump(
self.scheduled_tasks,
os.path.join(self.ctx.config_root, "scheduled_tasks.pickle"),
buffering=32 * MB,
)
task_props = arrow.array(pristine_attrs_dict(task) for task in self.finished_tasks.values())
partition_infos = arrow.array(task.partition_infos_as_dict for task in self.finished_tasks.values())
perf_metrics = arrow.array(dict(task.perf_metrics) for task in self.finished_tasks.values())
task_metrics = arrow.Table.from_arrays(
[task_props, partition_infos, perf_metrics],
names=["task_props", "partition_infos", "perf_metrics"],
)
task_metrics_csv = os.path.join(self.ctx.log_root, "task_metrics.csv")
csv.write_csv(task_metrics.flatten(), task_metrics_csv)
if self.ctx.shared_log_root:
shutil.copy(task_metrics_csv, self.ctx.shared_log_root)
logger.debug(f"exported task metrics to {task_metrics_csv}")