odps/models/tables.py (150 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .. import errors, serializers, utils from ..compat import six from .core import Iterable from .table import Table class Tables(Iterable): marker = serializers.XMLNodeField("Marker") max_items = serializers.XMLNodeField("MaxItems", parse_callback=int) tables = serializers.XMLNodesReferencesField(Table, "Table") def _get(self, item): return Table(client=self._client, parent=self, name=item) def __contains__(self, item): if isinstance(item, six.string_types): table = self._get(item) elif isinstance(item, Table): table = item else: return False try: table.reload() return True except errors.NoSuchObject: return False def __iter__(self): return self.iterate() def iterate(self, name=None, owner=None, type=None, extended=False): """ :param name: the prefix of table name :param owner: owner of the table :param type: type of the table :param extended: load extended information for table :return: """ actions = [] params = {"expectmarker": "true"} if name is not None: params["name"] = name if owner is not None: params["owner"] = owner if type is not None: table_type = type.upper() if isinstance(type, str) else type table_type = Table.Type(table_type) params["type"] = table_type.value if extended: actions.append("extended") schema_name = self._get_schema_name() if schema_name is not None: params["curr_schema"] = schema_name def _it(): last_marker = params.get("marker") if "marker" in params and (last_marker is None or len(last_marker) == 0): return url = self.resource() resp = self._client.get(url, actions=actions, params=params) t = Tables.parse(self._client, resp, obj=self) params["marker"] = t.marker return t.tables while True: tables = _it() if tables is None: break for table in tables: yield table def _run_table_sql(self, query, task_name=None, hints=None, wait=True): from .tasks import SQLTask task = SQLTask(name=task_name, query=query) hints = hints or {} hints["odps.sql.submit.mode"] = "" schema_name = self._get_schema_name() if schema_name is not None: hints["odps.sql.allow.namespace.schema"] = "true" hints["odps.namespace.schema"] = "true" if self._parent.project.odps.quota_name: hints["odps.task.wlm.quota"] = self._parent.project.odps.quota_name task.update_sql_settings(hints) instance = self._parent.project.instances.create(task=task) if wait: instance.wait_for_success() return instance @utils.with_wait_argument def create( self, table_name, table_schema, comment=None, if_not_exists=False, lifecycle=None, shard_num=None, hub_lifecycle=None, hints=None, transactional=False, storage_tier=None, async_=False, **kw ): project_name = self._parent.project.name schema_name = self._get_schema_name() sql = Table.gen_create_table_sql( table_name, table_schema, comment=comment, if_not_exists=if_not_exists, lifecycle=lifecycle, shard_num=shard_num, hub_lifecycle=hub_lifecycle, transactional=transactional, project=project_name, schema=schema_name, **kw ) hints = hints or {} if storage_tier: hints["odps.tiered.storage.enable"] = "true" instance = self._run_table_sql( sql, task_name="SQLCreateTableTask", hints=hints, wait=not async_ ) if not async_: return self[table_name] else: return instance def _gen_delete_table_sql(self, table_name, if_exists=False, table_type=None): project_name = self._parent.project.name schema_name = self._get_schema_name() buf = six.StringIO() if table_type is not None and isinstance(table_type, six.string_types): table_type = Table.Type(table_type.upper()) # override provided type if the object is already cached cached_table_type = self._get(table_name)._getattr("type") if cached_table_type is not None and ( table_type is None or table_type == Table.Type.MANAGED_TABLE ): table_type = cached_table_type if table_type == Table.Type.VIRTUAL_VIEW: type_str = "VIEW" elif table_type == Table.Type.MATERIALIZED_VIEW: type_str = "MATERIALIZED VIEW" else: type_str = "TABLE" buf.write("DROP %s " % type_str) if if_exists: buf.write("IF EXISTS ") if schema_name is not None: buf.write("%s.%s.`%s`" % (project_name, schema_name, table_name)) else: buf.write("%s.`%s`" % (project_name, table_name)) return buf.getvalue() @utils.with_wait_argument def delete( self, table_name, if_exists=False, async_=False, hints=None, table_type=None ): if isinstance(table_name, Table): table_name = table_name.name sql = self._gen_delete_table_sql( table_name, if_exists=if_exists, table_type=table_type ) del self[table_name] # release table in cache return self._run_table_sql( sql, task_name="SQLDropTableTask", hints=hints, wait=not async_ )