def dag()

in task-sdk/src/airflow/sdk/definitions/dag.py [0:0]


def dag(dag_id_or_func=None, __DAG_class=DAG, __warnings_stacklevel_delta=2, **decorator_kwargs):
    # TODO: Task-SDK: remove __DAG_class
    # __DAG_class is a temporary hack to allow the dag decorator in airflow.models.dag to continue to
    # return SchedulerDag objects
    DAG = __DAG_class

    def wrapper(f: Callable) -> Callable[..., DAG]:
        # Determine dag_id: prioritize keyword arg, then positional string, fallback to function name
        if "dag_id" in decorator_kwargs:
            dag_id = decorator_kwargs.pop("dag_id", "")
        elif isinstance(dag_id_or_func, str) and dag_id_or_func.strip():
            dag_id = dag_id_or_func
        else:
            dag_id = f.__name__

        @functools.wraps(f)
        def factory(*args, **kwargs):
            # Generate signature for decorated function and bind the arguments when called
            # we do this to extract parameters, so we can annotate them on the DAG object.
            # In addition, this fails if we are missing any args/kwargs with TypeError as expected.
            f_sig = signature(f).bind(*args, **kwargs)
            # Apply defaults to capture default values if set.
            f_sig.apply_defaults()

            # Initialize DAG with bound arguments
            with DAG(dag_id, **decorator_kwargs) as dag_obj:
                # Set DAG documentation from function documentation if it exists and doc_md is not set.
                if f.__doc__ and not dag_obj.doc_md:
                    dag_obj.doc_md = f.__doc__

                # Generate DAGParam for each function arg/kwarg and replace it for calling the function.
                # All args/kwargs for function will be DAGParam object and replaced on execution time.
                f_kwargs = {}
                for name, value in f_sig.arguments.items():
                    f_kwargs[name] = dag_obj.param(name, value)

                # set file location to caller source path
                back = sys._getframe().f_back
                dag_obj.fileloc = back.f_code.co_filename if back else ""

                # Invoke function to create operators in the DAG scope.
                f(**f_kwargs)

            # Return dag object such that it's accessible in Globals.
            return dag_obj

        # Ensure that warnings from inside DAG() are emitted from the caller, not here
        fixup_decorator_warning_stack(factory)
        return factory

    if callable(dag_id_or_func) and not isinstance(dag_id_or_func, str):
        return wrapper(dag_id_or_func)

    return wrapper