odps/models/core.py (204 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import warnings from .. import options, serializers, utils from ..compat import quote_plus, six from .cache import cache, del_cache class XMLRemoteModel(serializers.XMLSerializableModel): __slots__ = "_parent", "_client", "_schema_name" def __init__(self, **kwargs): if "parent" in kwargs: kwargs["_parent"] = kwargs.pop("parent") if "client" in kwargs: kwargs["_client"] = kwargs.pop("client") self._schema_name = utils.notset if not frozenset(kwargs).issubset(self.__slots__): unexpected = sorted(set(kwargs) - set(self.__slots__)) raise TypeError( "%s() meet illegal arguments (%s)" % (type(self).__name__, ", ".join(unexpected)) ) super(XMLRemoteModel, self).__init__(**kwargs) @classmethod def parse(cls, client, response, obj=None, **kw): kw["_client"] = client return super(XMLRemoteModel, cls).parse(response, obj=obj, **kw) class AbstractXMLRemoteModel(XMLRemoteModel): __slots__ = ("_type_indicator",) class JSONRemoteModel(serializers.JSONSerializableModel): __slots__ = "_parent", "_client" def __init__(self, **kwargs): if "parent" in kwargs: kwargs["_parent"] = kwargs.pop("parent") if "client" in kwargs: kwargs["_client"] = kwargs.pop("client") if not frozenset(kwargs).issubset(self.__slots__): unexpected = sorted(set(kwargs) - set(self.__slots__)) raise TypeError( "%s() meet illegal arguments (%s)" % (type(self).__name__, ", ".join(unexpected)) ) super(JSONRemoteModel, self).__init__(**kwargs) @classmethod def parse(cls, client, response, obj=None, **kw): kw["_client"] = client return super(JSONRemoteModel, cls).parse(response, obj=obj, **kw) class RestModel(XMLRemoteModel): def _name(self): return type(self).__name__.lower() def _getattr(self, attr): return object.__getattribute__(self, attr) @classmethod def _encode(cls, name): name = quote_plus(name).replace("+", "%20") return name def resource(self, client=None, endpoint=None): parent = self._parent if parent is None: if endpoint is None: endpoint = (client or self._client).endpoint parent_res = endpoint else: parent_res = parent.resource(client=client, endpoint=endpoint) name = self._name() if name is None: return parent_res return "/".join([parent_res, self._encode(name)]) def __eq__(self, other): if other is None: return False if not isinstance(other, type(self)): return False return self._name() == other._name() and self.parent == other.parent def __hash__(self): return hash(type(self)) * hash(self._parent) * hash(self._name()) def _get_schema_name(self): if self._schema_name is not utils.notset: return self._schema_name if isinstance(self._parent, LazyLoad): schema = self._parent.get_schema() elif isinstance(self._parent, Container) and self._parent._parent is not None: schema = self._parent._parent.get_schema() else: schema = None self._schema_name = schema.name if schema is not None else None return self._schema_name class LazyLoad(RestModel): __slots__ = ("_loaded",) @cache def __new__(cls, *args, **kwargs): return object.__new__(cls) def __init__(self, **kwargs): self._loaded = False kwargs.pop("no_cache", None) super(LazyLoad, self).__init__(**kwargs) def _name(self): return self._getattr("name") def __getattribute__(self, attr): if ( attr.endswith("_time") and attr != "_logview_address_time" and type(self).__name__ not in ("Table", "Partition") and options.use_legacy_parsedate ): warnings.warn( "We are returning local time instead of UTC time for objects " "while the latter is deprecated since PyODPS 0.11.3. Try setting " "options.use_legacy_parsedate = False and update your logic.", category=DeprecationWarning, ) typ = type(self) utils.add_survey_call( ".".join([typ.__module__, typ.__name__, attr]) + ":legacy_parsedate" ) val = object.__getattribute__(self, attr) if val is None and not self._loaded: fields = getattr(type(self), "__fields") if attr in fields: self.reload() return object.__getattribute__(self, attr) def reload(self): raise NotImplementedError def reset(self): self._loaded = False @property def is_loaded(self): return self._loaded def __repr__(self): try: r = self._repr() except: r = None if r: return r else: return super(LazyLoad, self).__repr__() def _repr(self): name = self._name() if name: return "<%s %s>" % (type(self).__name__, name) else: raise ValueError def __hash__(self): return hash((self._name(), self.parent)) def __eq__(self, other): if not isinstance(other, type(self)): return False return self._name() == other._name() and self.parent == other.parent def __getstate__(self): return self._name(), self._parent, self._client def __setstate__(self, state): name, parent, client = state self._set_state(name, parent, client) def _set_state(self, name, parent, client): self.__init__(name=name, _parent=parent, _client=client) @property def project(self): from .project import Project cur = self while cur is not None and not isinstance(cur, Project): cur = cur.parent return cur def get_schema(self): """ As Table.table_schema already occupied by table_schema result, we need an auxiliary method to fix all needs. """ from .schema import Schema cur = self while cur is not None and not isinstance(cur, Schema): cur = cur.parent return cur @property def schema(self): return self.get_schema() class Container(RestModel): skip_null = False @cache def __new__(cls, *args, **kwargs): return object.__new__(cls) def _get(self, item): raise NotImplementedError def _get_parent_typed(self, item): """ If an object has subtypes and needs an RPC call to get its type, this method returns a parent-typed instance for cases without RPC call in scenarios such as delete requesst. """ raise NotImplementedError def __getitem__(self, item): if isinstance(item, six.string_types): item = item.strip() if not item: raise ValueError("Empty string not supported") return self._get(item) raise ValueError("Unsupported getitem value: %s" % item) @del_cache def __delitem__(self, key): pass def __contains__(self, item): raise NotImplementedError def __getstate__(self): return self._parent, self._client def __setstate__(self, state): parent, client = state self.__init__(_parent=parent, _client=client) class Iterable(Container): __slots__ = ("_iter",) def __init__(self, **kwargs): super(Iterable, self).__init__(**kwargs) self._iter = iter(self) def __iter__(self): raise NotImplementedError def __next__(self): return next(self._iter) next = __next__