def user()

in o2a/o2a_libs/src/o2a_lib/el_wf_functions.py [0:0]


def user(context=None):
    """
    Returns gloabl user name, DAG owner or raises error
    if there is more than one DAG's owner.
    """
    owner = context.get("user.name", None)
    if owner:
        return owner

    dag: Optional[DAG] = context.get("dag", None)
    if dag is None:
        raise AirflowException("No dag reference in context.")

    owners: Set[str] = {t.owner for t in dag.tasks}
    if len(owners) > 1:
        raise AirflowException("DAG owner is ambiguous.")

    if not owners:
        raise AirflowException("DAG has no owner.")

    owner = owners.pop()
    return owner