in jetstream/diagnostics/task_monitoring_plugin.py [0:0]
def transition(self, key, start, finish, *args, **kwargs):
"""Called by the scheduler every time a task changes status."""
tasks = [task for _, task in self.scheduler.tasks.items()]
changed_tasks = [task for task in tasks if (task.key, task.state) not in self.cache]
worker_address = kwargs["worker"]
results = [
{
"experiment": self.experiment,
"key": task.key,
"timestamp": datetime.fromtimestamp(kwargs["startstops"][0]["stop"]).isoformat(),
"worker_address": worker_address,
"state": task.state,
}
for task in changed_tasks
]
self.cache = [(task.key, task.state) for task in tasks]
self._write_to_bigquery(results)