in connectors/service_cli.py [0:0]
def get_event_loop(uvloop=False):
if uvloop:
# activate uvloop if lib is present
try:
import uvloop
asyncio.set_event_loop_policy(_get_uvloop().EventLoopPolicy())
except Exception as e:
logger.warning(
f"Unable to enable uvloop: {e}. Running with default event loop"
)
pass
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.get_event_loop_policy().get_event_loop()
if loop is None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop