def _tearDownAsyncioLoop()

in later/unittest/backport/async_case.py [0:0]


    def _tearDownAsyncioLoop(self):
        assert self._asyncioTestLoop is not None
        loop = self._asyncioTestLoop
        self._asyncioTestLoop = None
        self._asyncioCallsQueue.put_nowait(None)
        loop.run_until_complete(self._asyncioCallsQueue.join())

        try:
            # cancel all tasks
            to_cancel = asyncio.all_tasks(loop)
            if not to_cancel:
                return

            for task in to_cancel:
                task.cancel()

            loop.run_until_complete(
                asyncio.gather(*to_cancel, loop=loop, return_exceptions=True)
            )

            for task in to_cancel:
                if task.cancelled():
                    continue
                if task.exception() is not None:  # pragma: nocover
                    loop.call_exception_handler(
                        {
                            "message": "unhandled exception during test shutdown",
                            "exception": task.exception(),
                            "task": task,
                        }
                    )
            # shutdown asyncgens
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            asyncio.set_event_loop(None)
            loop.close()