in libmozevent/utils.py [0:0]
def run_tasks(awaitables: Iterable, bus_to_restore=None):
"""
Helper to run tasks concurrently.
When a task raises an exception, the whole stack stops and the exception is raised.
When a SIGTERM is received, the whole stack stops and a asyncio.CancelledError is raised.
In case a SIGTERM is received and bus_to_restore is defined, Redis messages currently
processed by sequential tasks will be put back at the top of their original queue.
"""
try:
# Create a task grouping all awaitables and running them concurrently
tasks_group = asyncio.gather(*awaitables)
except TypeError:
raise TypeError(
"Could not run tasks: awaitable parameter must only contain awaitable functions"
)
def handle_sigterm(*args, **kwargs):
"""
Stop all tasks when receiving a SIGTERM.
This may particularly happen on Heroku dynos, been stopped at least once a day.
"""
log.warning("SIGTERM signal has been received. Stopping all running tasks…")
tasks_group.cancel()
event_loop = asyncio.get_event_loop()
event_loop.add_signal_handler(signal.SIGTERM, handle_sigterm)
async def _run():
try:
await tasks_group
except Exception as e:
log.error("Failure while running async tasks", error=str(e), exc_info=True)
# When ANY exception from one of the awaitables
# make sure the other awaitables are cancelled
tasks_group.cancel()
try:
event_loop.run_until_complete(_run())
except asyncio.CancelledError as e:
# Tasks have been cancelled intentionally, restore messages in Redis queues
if bus_to_restore is not None:
event_loop.run_until_complete(bus_to_restore.restore_redis_messages())
raise e