def deserialize_dag()

in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]


    def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
        """Deserializes a DAG from a JSON object."""
        if "dag_id" not in encoded_dag:
            raise RuntimeError(
                "Encoded dag object has no dag_id key.  You may need to run `airflow dags reserialize`."
            )

        dag = SerializedDAG(dag_id=encoded_dag["dag_id"], schedule=None)

        for k, v in encoded_dag.items():
            if k == "_downstream_task_ids":
                v = set(v)
            elif k == "tasks":
                SerializedBaseOperator._load_operator_extra_links = cls._load_operator_extra_links
                tasks = {}
                for obj in v:
                    if obj.get(Encoding.TYPE) == DAT.OP:
                        deser = SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
                        tasks[deser.task_id] = deser
                k = "task_dict"
                v = tasks
            elif k == "timezone":
                v = cls._deserialize_timezone(v)
            elif k == "dagrun_timeout":
                v = cls._deserialize_timedelta(v)
            elif k.endswith("_date"):
                v = cls._deserialize_datetime(v)
            elif k == "edge_info":
                # Value structure matches exactly
                pass
            elif k == "timetable":
                v = decode_timetable(v)
            elif k == "weight_rule":
                v = decode_priority_weight_strategy(v)
            elif k in cls._decorated_fields:
                v = cls.deserialize(v)
            elif k == "params":
                v = cls._deserialize_params_dict(v)
            elif k == "tags":
                v = set(v)
            # else use v as it is

            object.__setattr__(dag, k, v)

        # Set _task_group
        if "task_group" in encoded_dag:
            tg = TaskGroupSerialization.deserialize_task_group(
                encoded_dag["task_group"],
                None,
                dag.task_dict,
                dag,
            )
            object.__setattr__(dag, "task_group", tg)
        else:
            # This must be old data that had no task_group. Create a root TaskGroup and add
            # all tasks to it.
            object.__setattr__(dag, "task_group", TaskGroup.create_root(dag))
            for task in dag.tasks:
                dag.task_group.add(task)

        # Set has_on_*_callbacks to True if they exist in Serialized blob as False is the default
        if "has_on_success_callback" in encoded_dag:
            dag.has_on_success_callback = True
        if "has_on_failure_callback" in encoded_dag:
            dag.has_on_failure_callback = True

        keys_to_set_none = dag.get_serialized_fields() - encoded_dag.keys() - cls._CONSTRUCTOR_PARAMS.keys()
        for k in keys_to_set_none:
            setattr(dag, k, None)

        for task in dag.task_dict.values():
            SerializedBaseOperator.set_task_dag_references(task, dag)

        return dag