def transition()

in jetstream/diagnostics/task_monitoring_plugin.py [0:0]


    def transition(self, key, start, finish, *args, **kwargs):
        """Called by the scheduler every time a task changes status."""
        tasks = [task for _, task in self.scheduler.tasks.items()]
        changed_tasks = [task for task in tasks if (task.key, task.state) not in self.cache]
        worker_address = kwargs["worker"]

        results = [
            {
                "experiment": self.experiment,
                "key": task.key,
                "timestamp": datetime.fromtimestamp(kwargs["startstops"][0]["stop"]).isoformat(),
                "worker_address": worker_address,
                "state": task.state,
            }
            for task in changed_tasks
        ]

        self.cache = [(task.key, task.state) for task in tasks]
        self._write_to_bigquery(results)