def export_task_metrics()

in smallpond/execution/scheduler.py [0:0]


    def export_task_metrics(self):
        import pyarrow as arrow
        import pyarrow.csv as csv

        def pristine_attrs_dict(task: Task):
            return {
                key: str(val) if isinstance(val, Enum) else val
                for key in task._pristine_attrs
                if isinstance(
                    val := getattr(task, key),
                    (bool, str, int, float, Enum, np.integer, np.floating),
                )
            }

        dump(
            self.finished_tasks,
            os.path.join(self.ctx.config_root, "finished_tasks.pickle"),
            buffering=32 * MB,
        )
        dump(
            self.scheduled_tasks,
            os.path.join(self.ctx.config_root, "scheduled_tasks.pickle"),
            buffering=32 * MB,
        )

        task_props = arrow.array(pristine_attrs_dict(task) for task in self.finished_tasks.values())
        partition_infos = arrow.array(task.partition_infos_as_dict for task in self.finished_tasks.values())
        perf_metrics = arrow.array(dict(task.perf_metrics) for task in self.finished_tasks.values())
        task_metrics = arrow.Table.from_arrays(
            [task_props, partition_infos, perf_metrics],
            names=["task_props", "partition_infos", "perf_metrics"],
        )

        task_metrics_csv = os.path.join(self.ctx.log_root, "task_metrics.csv")
        csv.write_csv(task_metrics.flatten(), task_metrics_csv)

        if self.ctx.shared_log_root:
            shutil.copy(task_metrics_csv, self.ctx.shared_log_root)
        logger.debug(f"exported task metrics to {task_metrics_csv}")