test-runner/adapters/decorators.py (33 lines of code) (raw):
# Copyright (c) Microsoft. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for
# full license information.
import concurrent.futures
import functools
import asyncio
# default executor is not sufficient since default threads == CPUx5 and
# VMs will default to 1 CPU.
emulate_async_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=32, thread_name_prefix="emulate_async"
)
control_api_emulate_async_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=8, thread_name_prefix="control_api_emulate_async"
)
def get_running_loop():
"""
Gets the currently running event loop
Uses asyncio.get_running_loop() if available (Python 3.7+) or a backported
version of the same function in 3.5/3.6.
"""
try:
loop = asyncio.get_running_loop()
except AttributeError:
loop = asyncio._get_running_loop()
if loop is None:
raise RuntimeError("no running event loop")
return loop
def emulate_async(fn):
"""
Returns a coroutine function that calls a given function with emulated asynchronous
behavior via use of mulithreading.
Can be applied as a decorator.
:param fn: The sync function to be run in async.
:returns: A coroutine function that will call the given sync function.
"""
@functools.wraps(fn)
async def async_fn_wrapper(*args, **kwargs):
loop = get_running_loop()
return await loop.run_in_executor(
emulate_async_executor, functools.partial(fn, *args, **kwargs)
)
return async_fn_wrapper
def control_api_emulate_async(fn):
"""
Returns a coroutine function that calls a given function with emulated asynchronous
behavior via use of mulithreading.
Control APIs have their own threadpool. This is necessary because the emualte_async
threadpool can become full, especially if the network is disconencted. We need
control APIs to run so we can re-connect the network in this scenario.
Can be applied as a decorator.
:param fn: The sync function to be run in async.
:returns: A coroutine function that will call the given sync function.
"""
@functools.wraps(fn)
async def async_fn_wrapper(*args, **kwargs):
loop = get_running_loop()
return await loop.run_in_executor(
control_api_emulate_async_executor, functools.partial(fn, *args, **kwargs)
)
return async_fn_wrapper