def deserialize()

in airflow-core/src/airflow/serialization/serialized_objects.py [0:0]


    def deserialize(cls, encoded_var: Any) -> Any:
        """
        Deserialize an object; helper function of depth first search for deserialization.

        :meta private:
        """
        if cls._is_primitive(encoded_var):
            return encoded_var
        elif isinstance(encoded_var, list):
            return [cls.deserialize(v) for v in encoded_var]

        if not isinstance(encoded_var, dict):
            raise ValueError(f"The encoded_var should be dict and is {type(encoded_var)}")
        var = encoded_var[Encoding.VAR]
        type_ = encoded_var[Encoding.TYPE]
        if type_ == DAT.TASK_CONTEXT:
            d = {}
            for k, v in var.items():
                if k == "task":  # todo: add `_encode` of Operator so we don't need this
                    continue
                d[k] = cls.deserialize(v)
            d["task"] = d["task_instance"].task  # todo: add `_encode` of Operator so we don't need this
            d["macros"] = macros
            d["var"] = {
                "json": VariableAccessor(deserialize_json=True),
                "value": VariableAccessor(deserialize_json=False),
            }
            d["conn"] = ConnectionAccessor()
            return Context(**d)
        elif type_ == DAT.DICT:
            return {k: cls.deserialize(v) for k, v in var.items()}
        elif type_ == DAT.ASSET_EVENT_ACCESSORS:
            return decode_outlet_event_accessors(var)
        elif type_ == DAT.ASSET_UNIQUE_KEY:
            return AssetUniqueKey(name=var["name"], uri=var["uri"])
        elif type_ == DAT.ASSET_ALIAS_UNIQUE_KEY:
            return AssetAliasUniqueKey(name=var["name"])
        elif type_ == DAT.DAG:
            return SerializedDAG.deserialize_dag(var)
        elif type_ == DAT.OP:
            return SerializedBaseOperator.deserialize_operator(var)
        elif type_ == DAT.DATETIME:
            return from_timestamp(var)
        elif type_ == DAT.POD:
            if not _has_kubernetes():
                raise RuntimeError("Cannot deserialize POD objects without kubernetes libraries installed!")
            pod = PodGenerator.deserialize_model_dict(var)
            return pod
        elif type_ == DAT.TIMEDELTA:
            return datetime.timedelta(seconds=var)
        elif type_ == DAT.TIMEZONE:
            return decode_timezone(var)
        elif type_ == DAT.RELATIVEDELTA:
            return decode_relativedelta(var)
        elif type_ == DAT.AIRFLOW_EXC_SER or type_ == DAT.BASE_EXC_SER:
            deser = cls.deserialize(var)
            exc_cls_name = deser["exc_cls_name"]
            args = deser["args"]
            kwargs = deser["kwargs"]
            del deser
            if type_ == DAT.AIRFLOW_EXC_SER:
                exc_cls = import_string(exc_cls_name)
            else:
                exc_cls = import_string(f"builtins.{exc_cls_name}")
            return exc_cls(*args, **kwargs)
        elif type_ == DAT.BASE_TRIGGER:
            tr_cls_name, kwargs = cls.deserialize(var)
            tr_cls = import_string(tr_cls_name)
            return tr_cls(**kwargs)
        elif type_ == DAT.SET:
            return {cls.deserialize(v) for v in var}
        elif type_ == DAT.TUPLE:
            return tuple(cls.deserialize(v) for v in var)
        elif type_ == DAT.PARAM:
            return cls._deserialize_param(var)
        elif type_ == DAT.XCOM_REF:
            return _XComRef(var)  # Delay deserializing XComArg objects until we have the entire DAG.
        elif type_ == DAT.ASSET:
            return decode_asset(var)
        elif type_ == DAT.ASSET_ALIAS:
            return AssetAlias(**var)
        elif type_ == DAT.ASSET_ANY:
            return AssetAny(*(decode_asset_condition(x) for x in var["objects"]))
        elif type_ == DAT.ASSET_ALL:
            return AssetAll(*(decode_asset_condition(x) for x in var["objects"]))
        elif type_ == DAT.ASSET_REF:
            return Asset.ref(**var)
        elif type_ == DAT.SIMPLE_TASK_INSTANCE:
            return SimpleTaskInstance(**cls.deserialize(var))
        elif type_ == DAT.CONNECTION:
            return Connection(**var)
        elif type_ == DAT.TASK_CALLBACK_REQUEST:
            return TaskCallbackRequest.from_json(var)
        elif type_ == DAT.DAG_CALLBACK_REQUEST:
            return DagCallbackRequest.from_json(var)
        elif type_ == DAT.TASK_INSTANCE_KEY:
            return TaskInstanceKey(**var)
        elif type_ == DAT.ARG_NOT_SET:
            return NOTSET
        else:
            raise TypeError(f"Invalid type {type_!s} in deserialization.")