in pyignite/utils.py [0:0]
def status_to_exception(exc: Type[Exception], ignore_timeout=False):
"""
Converts erroneous status code with error message to an exception with type of the given class. Supports coroutines.
Also, support `timeout` argument for decorated async function.
:param exc: the class of exception to raise,
:param ignore_timeout: If set, ignore `timeout` argument.
:return: decorated function.
"""
def process_result(result):
if result.status != 0:
raise exc(result.message)
return result.value
def ste_decorator(fn):
if inspect.iscoroutinefunction(fn):
@wraps(fn)
async def ste_wrapper_async(*args, **kwargs):
timeout = kwargs.pop('timeout', 0)
if timeout and not ignore_timeout:
result = await asyncio.wait_for(fn(*args, **kwargs), timeout)
else:
result = await fn(*args, **kwargs)
return process_result(result)
return ste_wrapper_async
else:
@wraps(fn)
def ste_wrapper(*args, **kwargs):
return process_result(fn(*args, **kwargs))
return ste_wrapper
return ste_decorator