odps/models/tables.py (150 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import errors, serializers, utils
from ..compat import six
from .core import Iterable
from .table import Table
class Tables(Iterable):
marker = serializers.XMLNodeField("Marker")
max_items = serializers.XMLNodeField("MaxItems", parse_callback=int)
tables = serializers.XMLNodesReferencesField(Table, "Table")
def _get(self, item):
return Table(client=self._client, parent=self, name=item)
def __contains__(self, item):
if isinstance(item, six.string_types):
table = self._get(item)
elif isinstance(item, Table):
table = item
else:
return False
try:
table.reload()
return True
except errors.NoSuchObject:
return False
def __iter__(self):
return self.iterate()
def iterate(self, name=None, owner=None, type=None, extended=False):
"""
:param name: the prefix of table name
:param owner: owner of the table
:param type: type of the table
:param extended: load extended information for table
:return:
"""
actions = []
params = {"expectmarker": "true"}
if name is not None:
params["name"] = name
if owner is not None:
params["owner"] = owner
if type is not None:
table_type = type.upper() if isinstance(type, str) else type
table_type = Table.Type(table_type)
params["type"] = table_type.value
if extended:
actions.append("extended")
schema_name = self._get_schema_name()
if schema_name is not None:
params["curr_schema"] = schema_name
def _it():
last_marker = params.get("marker")
if "marker" in params and (last_marker is None or len(last_marker) == 0):
return
url = self.resource()
resp = self._client.get(url, actions=actions, params=params)
t = Tables.parse(self._client, resp, obj=self)
params["marker"] = t.marker
return t.tables
while True:
tables = _it()
if tables is None:
break
for table in tables:
yield table
def _run_table_sql(self, query, task_name=None, hints=None, wait=True):
from .tasks import SQLTask
task = SQLTask(name=task_name, query=query)
hints = hints or {}
hints["odps.sql.submit.mode"] = ""
schema_name = self._get_schema_name()
if schema_name is not None:
hints["odps.sql.allow.namespace.schema"] = "true"
hints["odps.namespace.schema"] = "true"
if self._parent.project.odps.quota_name:
hints["odps.task.wlm.quota"] = self._parent.project.odps.quota_name
task.update_sql_settings(hints)
instance = self._parent.project.instances.create(task=task)
if wait:
instance.wait_for_success()
return instance
@utils.with_wait_argument
def create(
self,
table_name,
table_schema,
comment=None,
if_not_exists=False,
lifecycle=None,
shard_num=None,
hub_lifecycle=None,
hints=None,
transactional=False,
storage_tier=None,
async_=False,
**kw
):
project_name = self._parent.project.name
schema_name = self._get_schema_name()
sql = Table.gen_create_table_sql(
table_name,
table_schema,
comment=comment,
if_not_exists=if_not_exists,
lifecycle=lifecycle,
shard_num=shard_num,
hub_lifecycle=hub_lifecycle,
transactional=transactional,
project=project_name,
schema=schema_name,
**kw
)
hints = hints or {}
if storage_tier:
hints["odps.tiered.storage.enable"] = "true"
instance = self._run_table_sql(
sql, task_name="SQLCreateTableTask", hints=hints, wait=not async_
)
if not async_:
return self[table_name]
else:
return instance
def _gen_delete_table_sql(self, table_name, if_exists=False, table_type=None):
project_name = self._parent.project.name
schema_name = self._get_schema_name()
buf = six.StringIO()
if table_type is not None and isinstance(table_type, six.string_types):
table_type = Table.Type(table_type.upper())
# override provided type if the object is already cached
cached_table_type = self._get(table_name)._getattr("type")
if cached_table_type is not None and (
table_type is None or table_type == Table.Type.MANAGED_TABLE
):
table_type = cached_table_type
if table_type == Table.Type.VIRTUAL_VIEW:
type_str = "VIEW"
elif table_type == Table.Type.MATERIALIZED_VIEW:
type_str = "MATERIALIZED VIEW"
else:
type_str = "TABLE"
buf.write("DROP %s " % type_str)
if if_exists:
buf.write("IF EXISTS ")
if schema_name is not None:
buf.write("%s.%s.`%s`" % (project_name, schema_name, table_name))
else:
buf.write("%s.`%s`" % (project_name, table_name))
return buf.getvalue()
@utils.with_wait_argument
def delete(
self, table_name, if_exists=False, async_=False, hints=None, table_type=None
):
if isinstance(table_name, Table):
table_name = table_name.name
sql = self._gen_delete_table_sql(
table_name, if_exists=if_exists, table_type=table_type
)
del self[table_name] # release table in cache
return self._run_table_sql(
sql, task_name="SQLDropTableTask", hints=hints, wait=not async_
)