in jbi/queue.py [0:0]
def check_writable(self) -> list[dockerflow.checks.CheckMessage]:
"""Heartbeat check to assert we can write items to queue"""
results = []
ping_result = self.backend.ping()
if ping_result is False:
results.append(
dockerflow.checks.Error(
f"queue with {str(self.backend)} unavailable",
hint="with FileBackend, check that folder is writable",
id="queue.backend.ping",
)
)
return results