in task-sdk/src/airflow/sdk/definitions/dag.py [0:0]
def dag(dag_id_or_func=None, __DAG_class=DAG, __warnings_stacklevel_delta=2, **decorator_kwargs):
# TODO: Task-SDK: remove __DAG_class
# __DAG_class is a temporary hack to allow the dag decorator in airflow.models.dag to continue to
# return SchedulerDag objects
DAG = __DAG_class
def wrapper(f: Callable) -> Callable[..., DAG]:
# Determine dag_id: prioritize keyword arg, then positional string, fallback to function name
if "dag_id" in decorator_kwargs:
dag_id = decorator_kwargs.pop("dag_id", "")
elif isinstance(dag_id_or_func, str) and dag_id_or_func.strip():
dag_id = dag_id_or_func
else:
dag_id = f.__name__
@functools.wraps(f)
def factory(*args, **kwargs):
# Generate signature for decorated function and bind the arguments when called
# we do this to extract parameters, so we can annotate them on the DAG object.
# In addition, this fails if we are missing any args/kwargs with TypeError as expected.
f_sig = signature(f).bind(*args, **kwargs)
# Apply defaults to capture default values if set.
f_sig.apply_defaults()
# Initialize DAG with bound arguments
with DAG(dag_id, **decorator_kwargs) as dag_obj:
# Set DAG documentation from function documentation if it exists and doc_md is not set.
if f.__doc__ and not dag_obj.doc_md:
dag_obj.doc_md = f.__doc__
# Generate DAGParam for each function arg/kwarg and replace it for calling the function.
# All args/kwargs for function will be DAGParam object and replaced on execution time.
f_kwargs = {}
for name, value in f_sig.arguments.items():
f_kwargs[name] = dag_obj.param(name, value)
# set file location to caller source path
back = sys._getframe().f_back
dag_obj.fileloc = back.f_code.co_filename if back else ""
# Invoke function to create operators in the DAG scope.
f(**f_kwargs)
# Return dag object such that it's accessible in Globals.
return dag_obj
# Ensure that warnings from inside DAG() are emitted from the caller, not here
fixup_decorator_warning_stack(factory)
return factory
if callable(dag_id_or_func) and not isinstance(dag_id_or_func, str):
return wrapper(dag_id_or_func)
return wrapper