in jetstream/diagnostics/resource_profiling_plugin.py [0:0]
def transition(self, key, start, finish, *args, **kwargs):
"""Called by the scheduler every time a task changes status."""
# if the task finished, record its memory usage:
if start == "processing" and finish in ("memory", "erred"):
worker_address = kwargs["worker"]
resource_usage = self._worker_resources.resources_for_task(worker_address)
self.results.append(
{
"experiment": self.experiment,
"key": key,
"function": str(pickle.loads(self.scheduler.tasks[key].run_spec["function"])),
"args": str(pickle.loads(self.scheduler.tasks[key].run_spec["args"])),
"start": datetime.fromtimestamp(kwargs["startstops"][0]["start"]).isoformat(),
"end": datetime.fromtimestamp(kwargs["startstops"][0]["stop"]).isoformat(),
"worker_address": worker_address,
"max_memory": float(max(resource_usage.memory_usage)),
"min_memory": float(min(resource_usage.memory_usage)),
"max_cpu": float(max(resource_usage.cpu_usage)),
"min_cpu": float(min(resource_usage.cpu_usage)),
}
)
# make sure that all tasks have finished and write all usage stats to BigQuery at once
# this improves performances vs. writing results on every transition
# https://distributed.dask.org/en/latest/scheduling-state.html#task-state
finished_tasks = [
task.state in ["released", "erred", "forgotten"]
for _, task in self.scheduler.tasks.items()
]
if all(finished_tasks):
self._write_to_bigquery()