docker_images/pythonv2/wrapper/python_glue/wrap_sync_in_async.py (36 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. import asyncio import logging import inspect import functools import concurrent.futures logger = logging.getLogger(__name__) # default executor is not sufficient since default threads == CPUx5 and # VMs will default to 1 CPU. emulate_async_executor = concurrent.futures.ThreadPoolExecutor( max_workers=32, thread_name_prefix="emulate_async" ) def get_running_loop(): """ Gets the currently running event loop Uses asyncio.get_running_loop() if available (Python 3.7+) or a backported version of the same function in 3.5/3.6. """ try: loop = asyncio.get_running_loop() except AttributeError: loop = asyncio._get_running_loop() if loop is None: raise RuntimeError("no running event loop") return loop def emulate_async(fn): """ Returns a coroutine function that calls a given function with emulated asynchronous behavior via use of mulithreading. Can be applied as a decorator. :param fn: The sync function to be run in async. :returns: A coroutine function that will call the given sync function. """ @functools.wraps(fn) async def async_fn_wrapper(*args, **kwargs): loop = get_running_loop() return await loop.run_in_executor( emulate_async_executor, functools.partial(fn, *args, **kwargs) ) return async_fn_wrapper def wrap_object(obj): for name in dir(obj): member = getattr(obj, name) if ( not name.startswith("_") and name.endswith("_sync") and inspect.ismethod(member) ): newname = name[:-5] logger.info("wrapping {} to become {}".format(name, newname)) setattr(obj, newname, emulate_async(member))