in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]
def deserialize(cls, encoded_var: Any) -> Any:
"""
Deserialize an object; helper function of depth first search for deserialization.
:meta private:
"""
if cls._is_primitive(encoded_var):
return encoded_var
elif isinstance(encoded_var, list):
return [cls.deserialize(v) for v in encoded_var]
if not isinstance(encoded_var, dict):
raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
var = encoded_var[Encoding.VAR]
type_ = encoded_var[Encoding.TYPE]
if type_ == DAT.TASK_CONTEXT:
d = {}
for k, v in var.items():
if k == "task": # todo: add `_encode` of Operator so we don't need this
continue
d[k] = cls.deserialize(v)
d["task"] = d["task_instance"].task # todo: add `_encode` of Operator so we don't need this
d["macros"] = macros
d["var"] = {
"json": VariableAccessor(deserialize_json=True),
"value": VariableAccessor(deserialize_json=False),
}
d["conn"] = ConnectionAccessor()
return Context(**d)
elif type_ == DAT.DICT:
return {k: cls.deserialize(v) for k, v in var.items()}
elif type_ == DAT.ASSET_EVENT_ACCESSORS:
return decode_outlet_event_accessors(var)
elif type_ == DAT.ASSET_UNIQUE_KEY:
return AssetUniqueKey(name=var["name"], uri=var["uri"])
elif type_ == DAT.ASSET_ALIAS_UNIQUE_KEY:
return AssetAliasUniqueKey(name=var["name"])
elif type_ == DAT.DAG:
return SerializedDAG.deserialize_dag(var)
elif type_ == DAT.OP:
return SerializedBaseOperator.deserialize_operator(var)
elif type_ == DAT.DATETIME:
return from_timestamp(var)
elif type_ == DAT.POD:
if not _has_kubernetes():
raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
pod = PodGenerator.deserialize_model_dict(var)
return pod
elif type_ == DAT.TIMEDELTA:
return datetime.timedelta(seconds=var)
elif type_ == DAT.TIMEZONE:
return decode_timezone(var)
elif type_ == DAT.RELATIVEDELTA:
return decode_relativedelta(var)
elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
deser = cls.deserialize(var)
exc_cls_name = deser["exc_cls_name"]
args = deser["args"]
kwargs = deser["kwargs"]
del deser
if type_ == DAT.AIRFLOW_EXC_SER:
exc_cls = import_string(exc_cls_name)
else:
exc_cls = import_string(f"builtins.{exc_cls_name}")
return exc_cls(*args, **kwargs)
elif type_ == DAT.BASE_TRIGGER:
tr_cls_name, kwargs = cls.deserialize(var)
tr_cls = import_string(tr_cls_name)
return tr_cls(**kwargs)
elif type_ == DAT.SET:
return {cls.deserialize(v) for v in var}
elif type_ == DAT.TUPLE:
return tuple(cls.deserialize(v) for v in var)
elif type_ == DAT.PARAM:
return cls._deserialize_param(var)
elif type_ == DAT.XCOM_REF:
return _XComRef(var) # Delay deserializing XComArg objects until we have the entire DAG.
elif type_ == DAT.ASSET:
return decode_asset(var)
elif type_ == DAT.ASSET_ALIAS:
return AssetAlias(**var)
elif type_ == DAT.ASSET_ANY:
return AssetAny(*(decode_asset_condition(x) for x in var["objects"]))
elif type_ == DAT.ASSET_ALL:
return AssetAll(*(decode_asset_condition(x) for x in var["objects"]))
elif type_ == DAT.ASSET_REF:
return Asset.ref(**var)
elif type_ == DAT.SIMPLE_TASK_INSTANCE:
return SimpleTaskInstance(**cls.deserialize(var))
elif type_ == DAT.CONNECTION:
return Connection(**var)
elif type_ == DAT.TASK_CALLBACK_REQUEST:
return TaskCallbackRequest.from_json(var)
elif type_ == DAT.DAG_CALLBACK_REQUEST:
return DagCallbackRequest.from_json(var)
elif type_ == DAT.TASK_INSTANCE_KEY:
return TaskInstanceKey(**var)
elif type_ == DAT.ARG_NOT_SET:
return NOTSET
else:
raise TypeError(f"Invalid type {type_!s} in deserialization.")