in src/pydolphinscheduler/tasks/func_wrap.py [0:0]
def task(func: types.FunctionType):
"""Decorate which covert Python functions into pydolphinscheduler task.
:param func: The function which wraps by decorator ``@task``.
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
_exists_other_decorator(func)
loc = func.__code__.co_filename
extractor = Extractor(Path(loc).open("r").read())
stm = extractor.get_code(func.__name__)
return Python(
name=kwargs.get("name", func.__name__),
definition=f"{stm}{func.__name__}()",
*args,
**kwargs,
)
return wrapper