in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]
def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
"""Deserializes a DAG from a JSON object."""
if "dag_id" not in encoded_dag:
raise RuntimeError(
"Encoded dag object has no dag_id key. You may need to run `airflow dags reserialize`."
)
dag = SerializedDAG(dag_id=encoded_dag["dag_id"], schedule=None)
for k, v in encoded_dag.items():
if k == "_downstream_task_ids":
v = set(v)
elif k == "tasks":
SerializedBaseOperator._load_operator_extra_links = cls._load_operator_extra_links
tasks = {}
for obj in v:
if obj.get(Encoding.TYPE) == DAT.OP:
deser = SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
tasks[deser.task_id] = deser
k = "task_dict"
v = tasks
elif k == "timezone":
v = cls._deserialize_timezone(v)
elif k == "dagrun_timeout":
v = cls._deserialize_timedelta(v)
elif k.endswith("_date"):
v = cls._deserialize_datetime(v)
elif k == "edge_info":
# Value structure matches exactly
pass
elif k == "timetable":
v = decode_timetable(v)
elif k == "weight_rule":
v = decode_priority_weight_strategy(v)
elif k in cls._decorated_fields:
v = cls.deserialize(v)
elif k == "params":
v = cls._deserialize_params_dict(v)
elif k == "tags":
v = set(v)
# else use v as it is
object.__setattr__(dag, k, v)
# Set _task_group
if "task_group" in encoded_dag:
tg = TaskGroupSerialization.deserialize_task_group(
encoded_dag["task_group"],
None,
dag.task_dict,
dag,
)
object.__setattr__(dag, "task_group", tg)
else:
# This must be old data that had no task_group. Create a root TaskGroup and add
# all tasks to it.
object.__setattr__(dag, "task_group", TaskGroup.create_root(dag))
for task in dag.tasks:
dag.task_group.add(task)
# Set has_on_*_callbacks to True if they exist in Serialized blob as False is the default
if "has_on_success_callback" in encoded_dag:
dag.has_on_success_callback = True
if "has_on_failure_callback" in encoded_dag:
dag.has_on_failure_callback = True
keys_to_set_none = dag.get_serialized_fields() - encoded_dag.keys() - cls._CONSTRUCTOR_PARAMS.keys()
for k in keys_to_set_none:
setattr(dag, k, None)
for task in dag.task_dict.values():
SerializedBaseOperator.set_task_dag_references(task, dag)
return dag