in src/asfquart/base.py [0:0]
def run_forever(self, loop, task):
"Run the application until exit, then cleanly shut down."
# Note: this logic is copied from quart/app.py
reload_ = False
try:
loop.run_until_complete(task)
except quart.utils.MustReloadError:
reload_ = True
LOGGER.debug('FOUND: MustReloadError')
except ExceptionGroup as e:
reload_ = (e.subgroup(quart.utils.MustReloadError) is not None)
LOGGER.debug(f'FOUND: ExceptionGroup, reload_={reload_}')
finally:
try:
quart.app._cancel_all_tasks(loop) # pylint: disable=protected-access
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
asyncio.set_event_loop(None)
loop.close()
if reload_:
quart.utils.restart()