odps/models/table.py (1,188 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
import warnings
from datetime import datetime
try:
import pyarrow as pa
except (AttributeError, ImportError):
pa = None
from .. import readers, serializers
from .. import types as odps_types
from .. import utils
from ..compat import Enum, dir2, six
from ..config import options
from ..expressions import parse as parse_expression
from .cluster_info import ClusterInfo
from .core import JSONRemoteModel, LazyLoad
from .partitions import Partitions
from .record import Record
from .storage_tier import StorageTier, StorageTierInfo
from .tableio import (
TableArrowReader,
TableArrowWriter,
TableRecordReader,
TableRecordWriter,
TableUpsertWriter,
)
_auto_part_regex = re.compile(r"^\S+\([^\)]+\)\s*AS\s*", re.I)
class TableSchema(odps_types.OdpsSchema, JSONRemoteModel):
"""
Schema includes the columns and partitions information of a :class:`odps.models.Table`.
There are two ways to initialize a Schema object, first is to provide columns and partitions,
the second way is to call the class method ``from_lists``. See the examples below:
:Example:
>>> columns = [Column(name='num', type='bigint', comment='the column')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = TableSchema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
>>>
>>> schema = TableSchema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
"""
class Shard(JSONRemoteModel):
hub_lifecycle = serializers.JSONNodeField("HubLifecycle")
shard_num = serializers.JSONNodeField("ShardNum")
distribute_cols = serializers.JSONNodeField("DistributeCols")
sort_cols = serializers.JSONNodeField("SortCols")
class TableColumn(odps_types.Column, JSONRemoteModel):
name = serializers.JSONNodeField("name")
type = serializers.JSONNodeField(
"type", parse_callback=odps_types.validate_data_type
)
comment = serializers.JSONNodeField("comment")
label = serializers.JSONNodeField("label")
nullable = serializers.JSONNodeField("isNullable")
_generate_expression = serializers.JSONNodeField("generateExpression")
def __init__(self, **kwargs):
kwargs.setdefault("nullable", True)
self._parsed_generate_expression = None
if "generate_expression" in kwargs:
self._generate_expression = kwargs.pop("generate_expression")
JSONRemoteModel.__init__(self, **kwargs)
if self.type is not None:
self.type = odps_types.validate_data_type(self.type)
class TablePartition(odps_types.Partition, TableColumn):
def __init__(self, **kwargs):
TableSchema.TableColumn.__init__(self, **kwargs)
def __init__(self, **kwargs):
kwargs["_columns"] = columns = kwargs.pop("columns", None)
kwargs["_partitions"] = partitions = kwargs.pop("partitions", None)
JSONRemoteModel.__init__(self, **kwargs)
odps_types.OdpsSchema.__init__(self, columns=columns, partitions=partitions)
def load(self):
self.update(self._columns, self._partitions)
self.build_snapshot()
comment = serializers.JSONNodeField("comment", set_to_parent=True)
owner = serializers.JSONNodeField("owner", set_to_parent=True)
creation_time = serializers.JSONNodeField(
"createTime", parse_callback=datetime.fromtimestamp, set_to_parent=True
)
last_data_modified_time = serializers.JSONNodeField(
"lastModifiedTime", parse_callback=datetime.fromtimestamp, set_to_parent=True
)
last_meta_modified_time = serializers.JSONNodeField(
"lastDDLTime", parse_callback=datetime.fromtimestamp, set_to_parent=True
)
is_virtual_view = serializers.JSONNodeField(
"isVirtualView", parse_callback=bool, set_to_parent=True
)
is_materialized_view = serializers.JSONNodeField(
"isMaterializedView", parse_callback=bool, set_to_parent=True
)
is_materialized_view_rewrite_enabled = serializers.JSONNodeField(
"isMaterializedViewRewriteEnabled",
parse_callback=lambda x: x is not None and str(x).lower() == "true",
set_to_parent=True,
)
is_materialized_view_outdated = serializers.JSONNodeField(
"isMaterializedViewOutdated",
parse_callback=lambda x: x is not None and str(x).lower() == "true",
set_to_parent=True,
)
lifecycle = serializers.JSONNodeField(
"lifecycle", parse_callback=int, set_to_parent=True
)
view_text = serializers.JSONNodeField("viewText", set_to_parent=True)
view_expanded_text = serializers.JSONNodeField(
"viewExpandedText", set_to_parent=True
)
size = serializers.JSONNodeField("size", parse_callback=int, set_to_parent=True)
is_archived = serializers.JSONNodeField(
"IsArchived", parse_callback=bool, set_to_parent=True
)
physical_size = serializers.JSONNodeField(
"PhysicalSize", parse_callback=int, set_to_parent=True
)
file_num = serializers.JSONNodeField(
"FileNum", parse_callback=int, set_to_parent=True
)
record_num = serializers.JSONNodeField(
"recordNum", parse_callback=int, set_to_parent=True
)
location = serializers.JSONNodeField("location", set_to_parent=True)
storage_handler = serializers.JSONNodeField("storageHandler", set_to_parent=True)
resources = serializers.JSONNodeField("resources", set_to_parent=True)
serde_properties = serializers.JSONNodeField(
"serDeProperties", type="json", set_to_parent=True
)
table_properties = serializers.JSONNodeField(
"tableProperties", type="json", set_to_parent=True
)
reserved = serializers.JSONNodeField("Reserved", type="json", set_to_parent=True)
shard = serializers.JSONNodeReferenceField(
Shard, "shardInfo", check_before=["shardExist"], set_to_parent=True
)
table_label = serializers.JSONNodeField(
"tableLabel", callback=lambda t: t if t != "0" else "", set_to_parent=True
)
_columns = serializers.JSONNodesReferencesField(TableColumn, "columns")
_partitions = serializers.JSONNodesReferencesField(TablePartition, "partitionKeys")
def __getstate__(self):
return self._columns, self._partitions
def __setstate__(self, state):
columns, partitions = state
self.__init__(columns=columns, partitions=partitions)
def __dir__(self):
return sorted(set(dir2(self)) - set(type(self)._parent_attrs))
class Table(LazyLoad):
"""
Table means the same to the RDBMS table, besides, a table can consist of partitions.
Table's properties are the same to the ones of :class:`odps.models.Project`,
which will not load from remote ODPS service until users try to get them.
In order to write data into table, users should call the ``open_writer``
method with **with statement**. At the same time, the ``open_reader`` method is used
to provide the ability to read records from a table or its partition.
:Example:
>>> table = odps.get_table('my_table')
>>> table.owner # first will load from remote
>>> table.reload() # reload to update the properties
>>>
>>> for record in table.head(5):
>>> # check the first 5 records
>>> for record in table.head(5, partition='pt=test', columns=['my_column'])
>>> # only check the `my_column` column from certain partition of this table
>>>
>>> with table.open_reader() as reader:
>>> count = reader.count # How many records of a table or its partition
>>> for record in reader[0: count]:
>>> # read all data, actually better to split into reading for many times
>>>
>>> with table.open_writer() as writer:
>>> writer.write(records)
>>> with table.open_writer(partition='pt=test', blocks=[0, 1]):
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # we can do this parallel
"""
_extended_args = (
"is_archived",
"physical_size",
"file_num",
"location",
"schema_version",
"storage_handler",
"resources",
"serde_properties",
"serde_info",
"input_format",
"output_format",
"stored_as",
"reserved",
"is_transactional",
"primary_key",
"storage_tier_info",
"cluster_info",
"acid_data_retain_hours",
"cdc_size",
"cdc_record_num",
"cdc_latest_version",
"cdc_latest_timestamp",
)
__slots__ = (
"_is_extend_info_loaded",
"last_meta_modified_time",
"is_virtual_view",
"is_materialized_view",
"is_materialized_view_rewrite_enabled",
"is_materialized_view_outdated",
"lifecycle",
"view_text",
"view_expanded_text",
"size",
"shard",
"record_num",
"table_properties",
"_table_tunnel",
"_id_thread_local",
)
__slots__ += _extended_args
_extended_args = set(_extended_args)
class Type(Enum):
MANAGED_TABLE = "MANAGED_TABLE"
EXTERNAL_TABLE = "EXTERNAL_TABLE"
OBJECT_TABLE = "OBJECT_TABLE"
VIRTUAL_VIEW = "VIRTUAL_VIEW"
MATERIALIZED_VIEW = "MATERIALIZED_VIEW"
name = serializers.XMLNodeField("Name")
table_id = serializers.XMLNodeField("TableId")
format = serializers.XMLNodeAttributeField(attr="format")
table_schema = serializers.XMLNodeReferenceField(TableSchema, "Schema")
comment = serializers.XMLNodeField("Comment")
owner = serializers.XMLNodeField("Owner")
table_label = serializers.XMLNodeField("TableLabel")
creation_time = serializers.XMLNodeField(
"CreationTime", parse_callback=utils.parse_rfc822
)
last_data_modified_time = serializers.XMLNodeField(
"LastModifiedTime", parse_callback=utils.parse_rfc822
)
last_access_time = serializers.XMLNodeField(
"LastAccessTime", parse_callback=utils.parse_rfc822
)
type = serializers.XMLNodeField(
"Type",
parse_callback=lambda s: Table.Type(s.upper()) if s is not None else None,
)
_download_ids = utils.thread_local_attribute("_id_thread_local", dict)
_upload_ids = utils.thread_local_attribute("_id_thread_local", dict)
def __init__(self, **kwargs):
self._is_extend_info_loaded = False
if "schema" in kwargs:
warnings.warn(
"Argument schema is deprecated and will be replaced by table_schema.",
DeprecationWarning,
stacklevel=2,
)
kwargs["table_schema"] = kwargs.pop("schema")
super(Table, self).__init__(**kwargs)
try:
del self._id_thread_local
except AttributeError:
pass
def table_resource(self, client=None, endpoint=None, force_schema=False):
schema_name = self._get_schema_name()
if force_schema:
schema_name = schema_name or "default"
if schema_name is None:
return self.resource(client=client, endpoint=endpoint)
return "/".join(
[
self.project.resource(client, endpoint=endpoint),
"schemas",
schema_name,
"tables",
self.name,
]
)
@property
def full_table_name(self):
schema_name = self._get_schema_name()
if schema_name is None:
return "{0}.`{1}`".format(self.project.name, self.name)
else:
return "{0}.{1}.`{2}`".format(self.project.name, schema_name, self.name)
def reload(self):
url = self.resource()
resp = self._client.get(url, curr_schema=self._get_schema_name())
self.parse(self._client, resp, obj=self)
self.table_schema.load()
self._loaded = True
def reset(self):
super(Table, self).reset()
self._is_extend_info_loaded = False
self.table_schema = None
@property
def schema(self):
warnings.warn(
"Table.schema is deprecated and will be replaced by Table.table_schema.",
DeprecationWarning,
stacklevel=3,
)
utils.add_survey_call(
".".join([type(self).__module__, type(self).__name__, "schema"])
)
return self.table_schema
@property
def last_modified_time(self):
warnings.warn(
"Table.last_modified_time is deprecated and will be replaced by "
"Table.last_data_modified_time.",
DeprecationWarning,
stacklevel=3,
)
utils.add_survey_call(
".".join([type(self).__module__, type(self).__name__, "last_modified_time"])
)
return self.last_data_modified_time
def _parse_reserved(self):
if not self.reserved:
self.schema_version = None
self.is_transactional = None
self.primary_key = None
self.storage_tier_info = None
self.cluster_info = None
self.acid_data_retain_hours = -1
self.cdc_size = -1
self.cdc_record_num = -1
self.cdc_latest_version = -1
self.cdc_latest_timestamp = None
self.serde_info = None
self.input_format = None
self.output_format = None
self.stored_as = None
return
self.schema_version = self.reserved.get("schema_version")
is_transactional = self.reserved.get("Transactional")
self.is_transactional = (
is_transactional is not None and is_transactional.lower() == "true"
)
self.primary_key = self.reserved.get("PrimaryKey")
self.storage_tier_info = StorageTierInfo.deserial(self.reserved)
self.cluster_info = ClusterInfo.deserial(self.reserved)
self.acid_data_retain_hours = int(
self.reserved.get("acid.data.retain.hours", "-1")
)
self.cdc_size = int(self.reserved.get("cdc_size", "-1"))
self.cdc_record_num = int(self.reserved.get("cdc_record_num", "-1"))
self.cdc_latest_version = int(self.reserved.get("cdc_latest_version", "-1"))
self.cdc_latest_timestamp = None
if "cdc_latest_timestamp" in self.reserved:
self.cdc_latest_timestamp = datetime.fromtimestamp(
int(self.reserved["cdc_latest_timestamp"])
)
self.serde_info = self.reserved.get("SerDeInfo")
self.input_format = self.reserved.get("InputFormat")
self.output_format = self.reserved.get("OutputFormat")
self.stored_as = self.reserved.get("StoredAs")
def reload_extend_info(self):
params = {}
schema_name = self._get_schema_name()
if schema_name is not None:
params["curr_schema"] = schema_name
resp = self._client.get(self.resource(), action="extended", params=params)
self.parse(self._client, resp, obj=self)
self._is_extend_info_loaded = True
if not self._loaded:
self.table_schema = None
self._parse_reserved()
def __getattribute__(self, attr):
if attr in type(self)._extended_args:
if not self._is_extend_info_loaded:
self.reload_extend_info()
if attr == "record_num" and self.table_schema.partitions:
warnings.warn(
"record_name nay not be correct when table has partitions. Use "
"aggregated value of required partitions instead.",
RuntimeWarning,
stacklevel=3,
)
return object.__getattribute__(self, attr)
val = object.__getattribute__(self, attr)
if val is None and not self._loaded:
if attr in getattr(TableSchema, "__fields"):
self.reload()
return object.__getattribute__(self, attr)
return super(Table, self).__getattribute__(attr)
def _get_column_generate_expression(self, column_name):
if self.table_schema[column_name].generate_expression is not None:
return self.table_schema[column_name].generate_expression
else:
table_props = self.table_properties or {}
if column_name.lower() == "_partitiontime" and utils.str_to_bool(
table_props.get("ingestion_time_partition")
):
col_expr = (
'[{"functionCall":{"name":"current_timestamp_ntz","type":"%s"}}]'
% str(self.table_schema[column_name].type)
)
return parse_expression(col_expr)
return None
def _repr(self):
buf = six.StringIO()
buf.write("odps.Table\n")
buf.write(" name: {0}\n".format(self.full_table_name))
if self.type:
buf.write(" type: {0}\n".format(self.type.value))
max_name_len = max(len(col.name) for col in self.table_schema.columns)
name_space = max_name_len + min(max_name_len, 16)
max_type_len = max(len(repr(col.type)) for col in self.table_schema.columns)
type_space = max_type_len + min(max_type_len, 16)
not_empty = lambda field: field is not None and len(field.strip()) > 0
buf.write(" schema:\n")
cols_strs = []
for col in self.table_schema._columns:
cols_strs.append(
"{0}: {1}{2}".format(
col.name.ljust(name_space),
repr(col.type).ljust(type_space),
"# {0}".format(utils.to_str(col.comment))
if not_empty(col.comment)
else "",
).strip()
)
buf.write(utils.indent("\n".join(cols_strs), 4))
buf.write("\n")
if self.table_schema.partitions:
buf.write(" partitions:\n")
partition_strs = []
for partition in self.table_schema.partitions:
partition_strs.append(
"{0}: {1}{2}".format(
partition.name.ljust(name_space),
repr(partition.type).ljust(type_space),
"# {0}".format(utils.to_str(partition.comment))
if not_empty(partition.comment)
else "",
).strip()
)
buf.write(utils.indent("\n".join(partition_strs), 4))
if self.view_text:
buf.write(" view text:\n{0}".format(utils.indent(self.view_text, 4)))
return buf.getvalue()
@classmethod
def gen_create_table_sql(
cls,
table_name,
table_schema,
comment=None,
if_not_exists=False,
lifecycle=None,
shard_num=None,
hub_lifecycle=None,
with_column_comments=True,
transactional=False,
primary_key=None,
storage_tier=None,
project=None,
schema=None,
table_type=None,
view_text=None,
**kw
):
buf = six.StringIO()
table_name = utils.to_text(table_name)
project = utils.to_text(project)
schema = utils.to_text(schema)
comment = utils.to_text(comment)
view_text = utils.to_text(view_text)
table_type = cls.Type(table_type or cls.Type.MANAGED_TABLE)
is_view = table_type in (cls.Type.VIRTUAL_VIEW, cls.Type.MATERIALIZED_VIEW)
primary_key = (
[primary_key] if isinstance(primary_key, six.string_types) else primary_key
)
stored_as = kw.get("stored_as") or kw.get("external_stored_as")
storage_handler = kw.get("storage_handler")
location = kw.get("location")
row_format_serde = kw.get("row_format_serde")
input_format = kw.get("input_format")
output_format = kw.get("output_format")
table_properties = kw.get("table_properties") or {}
cluster_info = kw.get("cluster_info")
use_auto_partitioning = kw.get("use_auto_partitioning")
rewrite_enabled = kw.get("rewrite_enabled")
rewrite_enabled = rewrite_enabled if rewrite_enabled is not None else True
if table_type == cls.Type.EXTERNAL_TABLE:
type_str = u"EXTERNAL TABLE"
elif table_type == cls.Type.VIRTUAL_VIEW:
type_str = u"VIEW"
elif table_type == cls.Type.MATERIALIZED_VIEW:
type_str = u"MATERIALIZED VIEW"
else:
type_str = u"EXTERNAL TABLE" if location else u"TABLE"
buf.write(u"CREATE %s " % type_str)
if if_not_exists:
buf.write(u"IF NOT EXISTS ")
if project is not None:
buf.write(u"%s." % project)
if schema is not None:
buf.write(u"%s." % schema)
buf.write(u"`%s` " % table_name)
if is_view and lifecycle is not None and lifecycle > 0:
buf.write("LIFECYCLE %s " % lifecycle)
def _write_primary_key(prev=""):
if not primary_key:
return
if not prev.strip().endswith(","):
buf.write(u",\n")
buf.write(
u" PRIMARY KEY (%s)" % ", ".join("`%s`" % c for c in primary_key)
)
if isinstance(table_schema, six.string_types):
buf.write(u"(\n")
buf.write(table_schema)
_write_primary_key(table_schema)
buf.write(u"\n)\n")
if comment:
buf.write(u"COMMENT '%s'\n" % utils.escape_odps_string(comment))
elif isinstance(table_schema, tuple):
buf.write(u"(\n")
buf.write(table_schema[0])
_write_primary_key(table_schema[0])
buf.write(u"\n)\n")
if comment:
buf.write(u"COMMENT '%s'\n" % utils.escape_odps_string(comment))
if len(table_schema) > 1 and table_schema[1]:
if (
use_auto_partitioning is None
and _auto_part_regex.search(table_schema[1])
) or use_auto_partitioning:
buf.write(u"AUTO \n")
buf.write(u"PARTITIONED BY ")
buf.write(u"(\n")
buf.write(table_schema[1])
buf.write(u"\n)\n")
else:
def write_columns(col_array, with_pk=False):
size = len(col_array)
buf.write(u"(\n")
for idx, column in enumerate(col_array):
buf.write(column.to_sql_clause(with_column_comments))
if idx < size - 1:
buf.write(u",\n")
if with_pk:
_write_primary_key()
buf.write(u"\n)\n")
def write_view_columns(col_array):
size = len(col_array)
buf.write(u"(\n")
for idx, column in enumerate(col_array):
buf.write(u" `%s`" % (utils.to_text(column.name)))
if with_column_comments and column.comment:
comment_str = utils.escape_odps_string(
utils.to_text(column.comment)
)
buf.write(u" COMMENT '%s'" % comment_str)
if idx < size - 1:
buf.write(u",\n")
buf.write(u"\n)\n")
if not is_view:
write_columns(table_schema.simple_columns, with_pk=True)
else:
write_view_columns(table_schema.simple_columns)
if comment:
comment_str = utils.escape_odps_string(utils.to_text(comment))
buf.write(u"COMMENT '%s'\n" % comment_str)
if table_type == cls.Type.MATERIALIZED_VIEW and not rewrite_enabled:
buf.write(u"DISABLE REWRITE\n")
manual_parts, auto_parts = [], []
for p in table_schema.partitions or []:
if p.generate_expression is None:
manual_parts.append(p)
else:
auto_parts.append(p)
if manual_parts and auto_parts:
raise ValueError(
"Auto partitioning and manual partitioning can not coexist"
)
if manual_parts:
if not is_view:
buf.write(u"PARTITIONED BY ")
write_columns(manual_parts)
else:
buf.write(u"PARTITIONED ON ")
write_view_columns(manual_parts)
if auto_parts:
buf.write(u"AUTO PARTITIONED BY ")
write_columns(auto_parts)
if cluster_info is not None:
buf.write(cluster_info.to_sql_clause())
buf.write(u"\n")
if transactional:
table_properties["transactional"] = "true"
if storage_tier:
if isinstance(storage_tier, six.string_types):
storage_tier = StorageTier(
utils.underline_to_camel(storage_tier).lower()
)
table_properties["storagetier"] = storage_tier.value
if table_properties:
buf.write(u"TBLPROPERTIES (\n")
for idx, (k, v) in enumerate(
sorted(table_properties.items(), key=lambda x: x[0])
):
k = utils.escape_odps_string(utils.to_text(k))
if isinstance(v, bool):
v = "true" if v else "false"
v = utils.escape_odps_string(utils.to_text(v))
buf.write(u" '%s' = '%s'" % (k, v))
if idx + 1 < len(table_properties):
buf.write(u",")
buf.write(u"\n")
buf.write(u")\n")
serde_properties = kw.get("serde_properties")
resources = kw.get("resources")
if location or stored_as:
if storage_handler:
buf.write(
u"STORED BY '%s'\n" % utils.escape_odps_string(storage_handler)
)
elif row_format_serde:
buf.write(
u"ROW FORMAT SERDE '%s'\n"
% utils.escape_odps_string(row_format_serde)
)
if serde_properties:
buf.write(u"WITH SERDEPROPERTIES (\n")
for idx, (k, v) in enumerate(
sorted(serde_properties.items(), key=lambda x: x[0])
):
k = utils.escape_odps_string(utils.to_text(k))
if isinstance(v, bool):
v = "true" if v else "false"
v = utils.escape_odps_string(utils.to_text(v))
buf.write(u" '%s' = '%s'" % (k, v))
if idx + 1 < len(serde_properties):
buf.write(u",")
buf.write(u"\n")
buf.write(u")\n")
if stored_as or input_format or output_format:
if not stored_as:
stored_as = u"\nINPUTFORMAT '%s'\nOUTPUTFORMAT '%s'" % (
utils.escape_odps_string(input_format),
utils.escape_odps_string(output_format),
)
buf.write(u"STORED AS %s\n" % stored_as)
if location:
buf.write(u"LOCATION '%s'\n" % utils.escape_odps_string(location))
if resources:
buf.write(u"USING '%s'\n" % utils.escape_odps_string(resources))
if not is_view and lifecycle is not None and lifecycle > 0:
buf.write(u"LIFECYCLE %s\n" % lifecycle)
if shard_num is not None:
buf.write(u"INTO %s SHARDS" % shard_num)
if hub_lifecycle is not None:
buf.write(u" HUBLIFECYCLE %s\n" % hub_lifecycle)
else:
buf.write(u"\n")
if is_view and view_text:
buf.write(u"AS %s\n" % view_text)
return buf.getvalue().strip()
def get_ddl(self, with_comments=True, if_not_exists=False, force_table_ddl=False):
"""
Get DDL SQL statement for the given table.
:param with_comments: append comment for table and each column
:param if_not_exists: generate `if not exists` code for generated DDL
:param force_table_ddl: force generate table DDL if object is a view
:return: DDL statement
"""
shard_num = self.shard.shard_num if self.shard is not None else None
storage_tier = (
self.storage_tier_info.storage_tier.value
if self.storage_tier_info
else None
)
table_type = self.type if not force_table_ddl else self.Type.MANAGED_TABLE
if self.stored_as and self.stored_as.lower() not in ("cfile", "unknown"):
stored_as = self.stored_as
row_format_serde = None
input_format = output_format = None
else:
# use system-designated serde to simplify ddl
stored_as = None
row_format_serde = (self.serde_info or {}).get("serializationLib")
input_format = self.input_format
output_format = self.output_format
# ignore default serde
if not self.location and (row_format_serde, input_format, output_format) == (
"com.aliyun.apsara.serde.CFileSerDe",
"com.aliyun.apsara.format.CFileInputFormat",
"com.aliyun.apsara.format.CFileOutputFormat",
):
row_format_serde = input_format = output_format = None
return self.gen_create_table_sql(
self.name,
self.table_schema,
self.comment if with_comments else None,
if_not_exists=if_not_exists,
with_column_comments=with_comments,
lifecycle=self.lifecycle,
shard_num=shard_num,
project=self.project.name,
storage_handler=self.storage_handler,
serde_properties=self.serde_properties,
table_properties=self.table_properties,
stored_as=stored_as,
row_format_serde=row_format_serde,
input_format=input_format,
output_format=output_format,
location=self.location,
resources=self.resources,
table_type=table_type,
storage_tier=storage_tier,
cluster_info=self.cluster_info,
transactional=self.is_transactional,
primary_key=self.primary_key,
view_text=self.view_text,
rewrite_enabled=self.is_materialized_view_rewrite_enabled,
)
def _build_partition_spec_sql(self, partition_spec=None):
partition_expr = ""
if partition_spec is not None:
if not isinstance(partition_spec, (list, tuple)):
partition_spec = [partition_spec]
partition_spec = [odps_types.PartitionSpec(spec) for spec in partition_spec]
partition_expr = " " + ", ".join(
"PARTITION (%s)" % spec for spec in partition_spec
)
# as data of partition changed, remove existing download id to avoid TableModified error
for part in partition_spec or [None]:
if isinstance(part, six.string_types):
part = odps_types.PartitionSpec(part)
self._download_ids.pop(part, None)
return partition_expr
def _build_alter_table_ddl(self, action=None, partition_spec=None, cmd=u"ALTER"):
action = action or ""
target = u"TABLE"
if self.type in (Table.Type.VIRTUAL_VIEW, Table.Type.MATERIALIZED_VIEW):
target = u"VIEW"
partition_expr = self._build_partition_spec_sql(partition_spec)
sql = u"%s %s %s%s %s" % (
cmd,
target,
self.full_table_name,
partition_expr,
action,
)
return sql.strip()
@utils.survey
def _head_by_data(self, limit, partition=None, columns=None, timeout=None):
if limit <= 0:
raise ValueError("limit number should >= 0.")
params = {"linenum": limit}
if partition is not None:
if not isinstance(partition, odps_types.PartitionSpec):
partition = odps_types.PartitionSpec(partition)
params["partition"] = str(partition)
if columns is not None and len(columns) > 0:
col_name = (
lambda col: col.name if isinstance(col, odps_types.Column) else col
)
params["cols"] = ",".join(col_name(col) for col in columns)
schema_name = self._get_schema_name()
if schema_name is not None:
params["schema_name"] = schema_name
resp = self._client.get(
self.resource(), action="data", params=params, stream=True, timeout=timeout
)
return readers.CsvRecordReader(
self.table_schema, resp, max_field_size=self._get_max_field_size()
)
def _head_by_preview(
self,
limit,
partition=None,
columns=None,
compress_algo=None,
timeout=None,
tags=None,
):
table_tunnel = self._create_table_tunnel()
return table_tunnel.open_preview_reader(
self,
partition_spec=partition,
columns=columns,
limit=limit,
compress_algo=compress_algo,
arrow=False,
timeout=timeout,
read_all=True,
tags=tags,
)
def head(
self,
limit,
partition=None,
columns=None,
use_legacy=True,
timeout=None,
tags=None,
):
"""
Get the head records of a table or its partition.
:param int limit: records' size, 10000 at most
:param partition: partition of this table
:param list columns: the columns which is subset of the table columns
:return: records
:rtype: list
.. seealso:: :class:`odps.models.Record`
"""
try:
if pa is not None and not use_legacy:
timeout = (
timeout
if timeout is not None
else options.tunnel.legacy_fallback_timeout
)
return self._head_by_preview(
limit,
partition=partition,
columns=columns,
timeout=timeout,
tags=tags,
)
except:
# only raises when under tests and
# use_legacy specified explicitly as False
if use_legacy is False:
raise
return self._head_by_data(
limit, partition=partition, columns=columns, timeout=timeout
)
def _create_table_tunnel(self, endpoint=None, quota_name=None):
if self._table_tunnel is not None:
return self._table_tunnel
from ..tunnel import TableTunnel
self._table_tunnel = TableTunnel(
client=self._client,
project=self.project,
endpoint=endpoint or self.project._tunnel_endpoint,
quota_name=quota_name,
)
return self._table_tunnel
def open_reader(
self,
partition=None,
reopen=False,
endpoint=None,
download_id=None,
timeout=None,
arrow=False,
columns=None,
quota_name=None,
async_mode=True,
append_partitions=None,
tags=None,
**kw
):
"""
Open the reader to read the entire records from this table or its partition.
:param partition: partition of this table
:param reopen: the reader will reuse last one, reopen is true means open a new reader.
:type reopen: bool
:param endpoint: the tunnel service URL
:param download_id: use existing download_id to download table contents
:param arrow: use arrow tunnel to read data
:param columns: columns to read
:param quota_name: name of tunnel quota
:param async_mode: enable async mode to create tunnels, can set True if session creation
takes a long time.
:param compress_option: compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
can be ``zlib``, ``snappy``
:param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
:param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
:param bool append_partitions: if True, partition values will be
appended to the output
:return: reader, ``count`` means the full size, ``status`` means the tunnel status
:Example:
>>> with table.open_reader() as reader:
>>> count = reader.count # How many records of a table or its partition
>>> for record in reader[0: count]:
>>> # read all data, actually better to split into reading for many times
"""
from ..tunnel.tabletunnel import TableDownloadSession
if self.is_transactional and self.primary_key:
# currently acid 2.0 table can only be read through select statement
sql_stmt = "SELECT * FROM %s" % self.full_table_name
if partition is not None:
part_spec = odps_types.PartitionSpec(partition)
conds = " AND ".join(
"%s='%s'" % (k, utils.escape_odps_string(v))
for k, v in part_spec.items()
)
sql_stmt += " WHERE " + conds
return self.project.odps.execute_sql(sql_stmt).open_reader()
if partition and not isinstance(partition, odps_types.PartitionSpec):
partition = odps_types.PartitionSpec(partition)
tunnel = self._create_table_tunnel(endpoint=endpoint, quota_name=quota_name)
download_ids = dict()
if download_id is None:
download_ids = self._download_ids
download_id = download_ids.get(partition) if not reopen else None
download_session = utils.call_with_retry(
tunnel.create_download_session,
table=self,
partition_spec=partition,
download_id=download_id,
timeout=timeout,
async_mode=async_mode,
tags=tags,
**kw
)
if (
download_id
and download_session.status != TableDownloadSession.Status.Normal
):
download_session = utils.call_with_retry(
tunnel.create_download_session,
table=self,
partition_spec=partition,
timeout=timeout,
async_mode=async_mode,
tags=tags,
**kw
)
download_ids[partition] = download_session.id
reader_cls = TableArrowReader if arrow else TableRecordReader
kw = (
{"append_partitions": append_partitions}
if append_partitions is not None
else {}
)
return reader_cls(self, download_session, partition, columns=columns, **kw)
def open_writer(
self,
partition=None,
blocks=None,
reopen=False,
create_partition=False,
commit=True,
endpoint=None,
upload_id=None,
arrow=False,
quota_name=None,
tags=None,
mp_context=None,
**kw
):
"""
Open the writer to write records into this table or its partition.
:param partition: partition of this table
:param blocks: block ids to open
:param bool reopen: the reader will reuse last one, reopen is true means open a new reader.
:param bool create_partition: if true, the partition will be created if not exist
:param endpoint: the tunnel service URL
:param upload_id: use existing upload_id to upload data
:param arrow: use arrow tunnel to write data
:param quota_name: name of tunnel quota
:param bool overwrite: if True, will overwrite existing data
:param compress_option: compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param compress_algo: compression algorithm, work when ``compress_option`` is not provided,
can be ``zlib``, ``snappy``
:param compress_level: used for ``zlib``, work when ``compress_option`` is not provided
:param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided
:return: writer, status means the tunnel writer status
:Example:
>>> with table.open_writer() as writer:
>>> writer.write(records)
>>> with table.open_writer(partition='pt=test', blocks=[0, 1]):
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # we can do this parallel
"""
from ..tunnel.tabletunnel import TableUploadSession
if partition and not isinstance(partition, odps_types.PartitionSpec):
partition = odps_types.PartitionSpec(partition)
if create_partition and not self.exist_partition(create_partition):
self.create_partition(partition, if_not_exists=True)
tunnel = self._create_table_tunnel(endpoint=endpoint, quota_name=quota_name)
if upload_id is None:
upload_ids = self._upload_ids
upload_id = upload_ids.get(partition) if not reopen else None
else:
upload_ids = dict()
use_upsert = self.is_transactional and self.primary_key
def _create_session(upload_id_):
if use_upsert:
return tunnel.create_upsert_session(
table=self,
partition_spec=partition,
upsert_id=upload_id_,
tags=tags,
**kw
)
else:
return tunnel.create_upload_session(
table=self,
partition_spec=partition,
upload_id=upload_id,
tags=tags,
**kw
)
upload_session = utils.call_with_retry(_create_session, upload_id)
if (
upload_id
and upload_session.status.value != TableUploadSession.Status.Normal.value
):
# check upload session status
upload_session = utils.call_with_retry(_create_session, None)
upload_ids[partition] = upload_session.id
# as data of partition changed, remove existing download id to avoid TableModified error
self._download_ids.pop(partition, None)
if arrow:
writer_cls = TableArrowWriter
elif use_upsert:
writer_cls = TableUpsertWriter
else:
writer_cls = TableRecordWriter
def _writer_on_close():
if commit:
upload_ids[partition] = None
return writer_cls(
self,
upload_session,
blocks=blocks,
commit=commit,
on_close=_writer_on_close,
mp_context=mp_context,
)
def to_pandas(
self,
partition=None,
columns=None,
start=None,
count=None,
n_process=1,
quota_name=None,
append_partitions=None,
tags=None,
**kwargs
):
"""
Read table data into pandas DataFrame
:param partition: partition of this table
:param list columns: columns to read
:param int start: start row index from 0
:param int count: data count to read
:param int n_process: number of processes to accelerate reading
:param bool append_partitions: if True, partition values will be
appended to the output
:param str quota_name: name of tunnel quota to use
"""
if partition is None and self.table_schema.partitions:
raise ValueError(
"You must specify a partition when calling to_pandas on a partitioned table"
)
arrow = kwargs.pop("arrow", True)
with self.open_reader(
partition=partition,
columns=columns,
arrow=arrow,
quota_name=quota_name,
append_partitions=append_partitions,
tags=tags,
**kwargs
) as reader:
return reader.to_pandas(start=start, count=count, n_process=n_process)
def iter_pandas(
self,
partition=None,
columns=None,
batch_size=None,
start=None,
count=None,
quota_name=None,
append_partitions=None,
tags=None,
**kwargs
):
"""
Iterate table data in blocks as pandas DataFrame
:param partition: partition of this table
:param list columns: columns to read
:param int batch_size: size of DataFrame batch to read
:param int start: start row index from 0
:param int count: data count to read
:param bool append_partitions: if True, partition values will be
appended to the output
:param str quota_name: name of tunnel quota to use
"""
if partition is None and self.table_schema.partitions:
raise ValueError(
"You must specify a partition when calling to_pandas on a partitioned table"
)
kwargs.pop("arrow", None)
with self.open_reader(
partition=partition,
columns=columns,
arrow=True,
quota_name=quota_name,
append_partitions=append_partitions,
tags=tags,
**kwargs
) as reader:
for batch in reader.iter_pandas(
batch_size, start=start, count=count, columns=columns
):
yield batch
@property
def partitions(self):
return Partitions(parent=self, client=self._client)
@utils.with_wait_argument
def create_partition(
self, partition_spec, if_not_exists=False, async_=False, hints=None
):
"""
Create a partition within the table.
:param partition_spec: specification of the partition.
:param if_not_exists:
:param hints:
:param async_:
:return: partition object
:rtype: odps.models.partition.Partition
"""
return self.partitions.create(
partition_spec, if_not_exists=if_not_exists, hints=hints, async_=async_
)
@utils.with_wait_argument
def delete_partition(
self, partition_spec, if_exists=False, async_=False, hints=None
):
"""
Delete a partition within the table.
:param partition_spec: specification of the partition.
:param if_exists:
:param hints:
:param async_:
"""
return self.partitions.delete(
partition_spec, if_exists=if_exists, hints=hints, async_=async_
)
def exist_partition(self, partition_spec):
"""
Check if a partition exists within the table.
:param partition_spec: specification of the partition.
"""
return partition_spec in self.partitions
def exist_partitions(self, prefix_spec=None):
"""
Check if partitions with provided conditions exist.
:param prefix_spec: prefix of partition
:return: whether partitions exist
"""
try:
next(self.partitions.iterate_partitions(spec=prefix_spec))
except StopIteration:
return False
return True
def iterate_partitions(self, spec=None, reverse=False):
"""
Create an iterable object to iterate over partitions.
:param spec: specification of the partition.
:param reverse: output partitions in reversed order
"""
return self.partitions.iterate_partitions(spec=spec, reverse=reverse)
def get_partition(self, partition_spec):
"""
Get a partition with given specifications.
:param partition_spec: specification of the partition.
:return: partition object
:rtype: odps.models.partition.Partition
"""
return self.partitions[partition_spec]
def get_max_partition(self, spec=None, skip_empty=True, reverse=False):
"""
Get partition with maximal values within certain spec.
:param spec: parent partitions. if specified, will return partition with
maximal value within specified parent partition
:param skip_empty: if True, will skip partitions without data
:param reverse: if True, will return minimal value
:return: Partition
"""
if not self.table_schema.partitions:
raise ValueError("Table %r not partitioned" % self.name)
return self.partitions.get_max_partition(
spec, skip_empty=skip_empty, reverse=reverse
)
def _unload_if_async(self, async_=False, reload=True):
if async_:
self._loaded = False
elif reload:
self.reload()
@utils.with_wait_argument
def truncate(self, partition_spec=None, async_=False, hints=None):
"""
truncate this table.
:param partition_spec: partition specs
:param hints:
:param async_: run asynchronously if True
:return: None
"""
sql = self._build_alter_table_ddl(partition_spec=partition_spec, cmd="TRUNCATE")
inst = self.parent._run_table_sql(
sql, task_name="SQLTruncateTableTask", hints=hints, wait=not async_
)
self._unload_if_async()
return inst
@utils.with_wait_argument
def drop(self, async_=False, if_exists=False, hints=None):
"""
Drop this table.
:param async_: run asynchronously if True
:param if_exists:
:param hints:
:return: None
"""
return self.parent.delete(self, async_=async_, if_exists=if_exists, hints=hints)
@utils.with_wait_argument
def set_storage_tier(
self, storage_tier, partition_spec=None, async_=False, hints=None
):
"""
Set storage tier of current table or specific partition.
"""
self._is_extend_info_loaded = False
if isinstance(storage_tier, six.string_types):
storage_tier = StorageTier(utils.underline_to_camel(storage_tier).lower())
property_item = "TBLPROPERTIES" if not partition_spec else "PARTITIONPROPERTIES"
sql = self._build_alter_table_ddl(
"SET %s('storagetier'='%s')" % (property_item, storage_tier.value),
partition_spec=partition_spec,
)
hints = hints or {}
hints["odps.tiered.storage.enable"] = "true"
inst = self.parent._run_table_sql(
sql, task_name="SQLSetStorageTierTask", hints=hints, wait=not async_
)
self.storage_tier_info = storage_tier
self._unload_if_async(async_=async_, reload=False)
return inst
@utils.with_wait_argument
def add_columns(self, columns, if_not_exists=False, async_=False, hints=None):
"""
Add columns to the table.
:param columns: columns to add, can be a list of :class:`~odps.types.Column`
or a string of column definitions
:param if_not_exists: if True, will not raise exception when column exists
:Example:
>>> table = odps.create_table('test_table', schema=TableSchema.from_lists(['name', 'id'], ['sring', 'string']))
>>> # add column by Column instance
>>> table.add_columns([Column('id2', 'string')])
>>> # add column by a string of column definitions
>>> table.add_columns("fid double, fid2 double")
"""
if isinstance(columns, odps_types.Column):
columns = [columns]
action_str = u"ADD COLUMNS" + (
u" IF NOT EXISTS (\n" if if_not_exists else u" (\n"
)
if isinstance(columns, six.string_types):
action_str += columns + "\n)"
else:
action_str += (
u",\n".join([" " + col.to_sql_clause() for col in columns]) + u"\n)"
)
sql = self._build_alter_table_ddl(action_str)
inst = self.parent._run_table_sql(
sql, task_name="SQLAddColumnsTask", hints=hints, wait=not async_
)
self._unload_if_async(async_=async_)
return inst
@utils.with_wait_argument
def delete_columns(self, columns, async_=False, hints=None):
"""
Delete columns from the table.
:param columns: columns to delete, can be a list of column names
"""
if isinstance(columns, six.string_types):
columns = [columns]
action_str = u"DROP COLUMNS " + u", ".join(u"`%s`" % c for c in columns)
sql = self._build_alter_table_ddl(action_str)
inst = self.parent._run_table_sql(
sql, task_name="SQLDeleteColumnsTask", hints=hints, wait=not async_
)
self._unload_if_async(async_=async_)
return inst
@utils.with_wait_argument
def rename_column(
self, old_column_name, new_column_name, comment=None, async_=False, hints=None
):
"""
Rename a column in the table.
:param old_column_name: old column name
:param new_column_name: new column name
:param comment: new column comment, optional
"""
if comment:
old_col = self.table_schema[old_column_name]
new_col = odps_types.Column(
name=new_column_name,
type=old_col.type,
comment=comment,
label=old_col.label,
nullable=old_col.nullable,
)
action_str = u"CHANGE COLUMN %s %s" % (
old_column_name,
new_col.to_sql_clause(),
)
else:
action_str = u"CHANGE COLUMN %s RENAME TO %s" % (
old_column_name,
new_column_name,
)
sql = self._build_alter_table_ddl(action_str)
inst = self.parent._run_table_sql(
sql, task_name="SQLRenameColumnsTask", hints=hints, wait=not async_
)
self._unload_if_async(async_=async_)
return inst
@utils.with_wait_argument
def set_lifecycle(self, days, async_=False, hints=None):
"""
Set lifecycle of current table.
:param days: lifecycle in days
"""
sql = self._build_alter_table_ddl(u"SET LIFECYCLE %s" % days)
inst = self.parent._run_table_sql(
sql, task_name="SQLSetLifecycleTask", hints=hints, wait=not async_
)
self.lifecycle = days
self._unload_if_async(async_=async_, reload=False)
return inst
@utils.with_wait_argument
def set_owner(self, new_owner, async_=False, hints=None):
"""
Set owner of current table.
:param new_owner: account of the new owner
"""
sql = self._build_alter_table_ddl(
u"CHANGEOWNER TO '%s'" % utils.escape_odps_string(new_owner)
)
inst = self.parent._run_table_sql(
sql, task_name="SQLSetOwnerTask", hints=hints, wait=not async_
)
self.owner = new_owner
self._unload_if_async(async_=async_, reload=False)
return inst
@utils.with_wait_argument
def set_comment(self, new_comment, async_=False, hints=None):
"""
Set comment of current table.
:param new_comment: new comment
"""
sql = self._build_alter_table_ddl(
u"SET COMMENT '%s'" % utils.escape_odps_string(new_comment)
)
inst = self.parent._run_table_sql(
sql, task_name="SQLSetCommentTask", hints=hints, wait=not async_
)
self.comment = new_comment
self._unload_if_async(async_=async_, reload=False)
return inst
@utils.with_wait_argument
def set_cluster_info(self, new_cluster_info, async_=False, hints=None):
"""
Set cluster info of current table.
"""
if new_cluster_info is None:
action = u"NOT CLUSTERED"
else:
assert isinstance(new_cluster_info, ClusterInfo)
action = new_cluster_info.to_sql_clause()
sql = self._build_alter_table_ddl(action)
inst = self.parent._run_table_sql(
sql, task_name="SQLSetClusterInfoTask", hints=hints, wait=not async_
)
self.cluster_info = new_cluster_info
self._unload_if_async(async_=async_, reload=False)
return inst
@utils.with_wait_argument
def rename(self, new_name, async_=False, hints=None):
"""
Rename the table.
:param new_name: new table name
"""
sql = self._build_alter_table_ddl("RENAME TO `%s`" % new_name)
inst = self.parent._run_table_sql(
sql, task_name="SQLRenameTask", hints=hints, wait=not async_
)
self.name = new_name
del self.parent[self.name]
self._unload_if_async(async_=async_)
return inst
@utils.with_wait_argument
def change_partition_spec(
self, old_partition_spec, new_partition_spec, async_=False, hints=None
):
"""
Change partition spec of specified partition of the table.
:param old_partition_spec: old partition spec
:param new_partition_spec: new partition spec
"""
sql = self._build_alter_table_ddl(
"RENAME TO %s" % self._build_partition_spec_sql(new_partition_spec),
partition_spec=old_partition_spec,
)
return self.parent._run_table_sql(
sql, task_name="SQLChangePartitionSpecTask", hints=hints, wait=not async_
)
@utils.with_wait_argument
def touch(self, partition_spec=None, async_=False, hints=None):
"""
Update the last modified time of the table or specified partition.
:param partition_spec: partition spec, optional
"""
action = u"TOUCH " + self._build_partition_spec_sql(partition_spec)
sql = self._build_alter_table_ddl(action.strip())
inst = self.parent._run_table_sql(
sql, task_name="SQLTouchTask", hints=hints, wait=not async_
)
self._unload_if_async(async_=async_)
return inst
def _get_max_field_size(self):
try:
project_field_size = self.project.get_property(
"odps.sql.cfile2.field.maxsize", None
)
return int(project_field_size or 0) * 1024
except:
return 0
def new_record(self, values=None):
"""
Generate a record of the table.
:param values: the values of this records
:type values: list
:return: record
:rtype: :class:`odps.models.Record`
:Example:
>>> table = odps.create_table('test_table', schema=TableSchema.from_lists(['name', 'id'], ['sring', 'string']))
>>> record = table.new_record()
>>> record[0] = 'my_name'
>>> record[1] = 'my_id'
>>> record = table.new_record(['my_name', 'my_id'])
.. seealso:: :class:`odps.models.Record`
"""
return Record(
schema=self.table_schema,
values=values,
max_field_size=self._get_max_field_size(),
)
def to_df(self):
"""
Create a PyODPS DataFrame from this table.
:return: DataFrame object
"""
from ..df import DataFrame
return DataFrame(self)