def transition()

in jetstream/diagnostics/resource_profiling_plugin.py [0:0]


    def transition(self, key, start, finish, *args, **kwargs):
        """Called by the scheduler every time a task changes status."""
        # if the task finished, record its memory usage:
        if start == "processing" and finish in ("memory", "erred"):
            worker_address = kwargs["worker"]
            resource_usage = self._worker_resources.resources_for_task(worker_address)
            self.results.append(
                {
                    "experiment": self.experiment,
                    "key": key,
                    "function": str(pickle.loads(self.scheduler.tasks[key].run_spec["function"])),
                    "args": str(pickle.loads(self.scheduler.tasks[key].run_spec["args"])),
                    "start": datetime.fromtimestamp(kwargs["startstops"][0]["start"]).isoformat(),
                    "end": datetime.fromtimestamp(kwargs["startstops"][0]["stop"]).isoformat(),
                    "worker_address": worker_address,
                    "max_memory": float(max(resource_usage.memory_usage)),
                    "min_memory": float(min(resource_usage.memory_usage)),
                    "max_cpu": float(max(resource_usage.cpu_usage)),
                    "min_cpu": float(min(resource_usage.cpu_usage)),
                }
            )

        # make sure that all tasks have finished and write all usage stats to BigQuery at once
        # this improves performances vs. writing results on every transition
        # https://distributed.dask.org/en/latest/scheduling-state.html#task-state
        finished_tasks = [
            task.state in ["released", "erred", "forgotten"]
            for _, task in self.scheduler.tasks.items()
        ]
        if all(finished_tasks):
            self._write_to_bigquery()