def task()

in src/pydolphinscheduler/tasks/func_wrap.py [0:0]


def task(func: types.FunctionType):
    """Decorate which covert Python functions into pydolphinscheduler task.

    :param func: The function which wraps by decorator ``@task``.
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        _exists_other_decorator(func)
        loc = func.__code__.co_filename
        extractor = Extractor(Path(loc).open("r").read())
        stm = extractor.get_code(func.__name__)
        return Python(
            name=kwargs.get("name", func.__name__),
            definition=f"{stm}{func.__name__}()",
            *args,
            **kwargs,
        )

    return wrapper