odps/models/table.py (1,188 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import re import warnings from datetime import datetime try: import pyarrow as pa except (AttributeError, ImportError): pa = None from .. import readers, serializers from .. import types as odps_types from .. import utils from ..compat import Enum, dir2, six from ..config import options from ..expressions import parse as parse_expression from .cluster_info import ClusterInfo from .core import JSONRemoteModel, LazyLoad from .partitions import Partitions from .record import Record from .storage_tier import StorageTier, StorageTierInfo from .tableio import ( TableArrowReader, TableArrowWriter, TableRecordReader, TableRecordWriter, TableUpsertWriter, ) _auto_part_regex = re.compile(r"^\S+\([^\)]+\)\s*AS\s*", re.I) class TableSchema(odps_types.OdpsSchema, JSONRemoteModel): """ Schema includes the columns and partitions information of a :class:`odps.models.Table`. There are two ways to initialize a Schema object, first is to provide columns and partitions, the second way is to call the class method ``from_lists``. See the examples below: :Example: >>> columns = [Column(name='num', type='bigint', comment='the column')] >>> partitions = [Partition(name='pt', type='string', comment='the partition')] >>> schema = TableSchema(columns=columns, partitions=partitions) >>> schema.columns [<column num, type bigint>, <partition pt, type string>] >>> >>> schema = TableSchema.from_lists(['num'], ['bigint'], ['pt'], ['string']) >>> schema.columns [<column num, type bigint>, <partition pt, type string>] """ class Shard(JSONRemoteModel): hub_lifecycle = serializers.JSONNodeField("HubLifecycle") shard_num = serializers.JSONNodeField("ShardNum") distribute_cols = serializers.JSONNodeField("DistributeCols") sort_cols = serializers.JSONNodeField("SortCols") class TableColumn(odps_types.Column, JSONRemoteModel): name = serializers.JSONNodeField("name") type = serializers.JSONNodeField( "type", parse_callback=odps_types.validate_data_type ) comment = serializers.JSONNodeField("comment") label = serializers.JSONNodeField("label") nullable = serializers.JSONNodeField("isNullable") _generate_expression = serializers.JSONNodeField("generateExpression") def __init__(self, **kwargs): kwargs.setdefault("nullable", True) self._parsed_generate_expression = None if "generate_expression" in kwargs: self._generate_expression = kwargs.pop("generate_expression") JSONRemoteModel.__init__(self, **kwargs) if self.type is not None: self.type = odps_types.validate_data_type(self.type) class TablePartition(odps_types.Partition, TableColumn): def __init__(self, **kwargs): TableSchema.TableColumn.__init__(self, **kwargs) def __init__(self, **kwargs): kwargs["_columns"] = columns = kwargs.pop("columns", None) kwargs["_partitions"] = partitions = kwargs.pop("partitions", None) JSONRemoteModel.__init__(self, **kwargs) odps_types.OdpsSchema.__init__(self, columns=columns, partitions=partitions) def load(self): self.update(self._columns, self._partitions) self.build_snapshot() comment = serializers.JSONNodeField("comment", set_to_parent=True) owner = serializers.JSONNodeField("owner", set_to_parent=True) creation_time = serializers.JSONNodeField( "createTime", parse_callback=datetime.fromtimestamp, set_to_parent=True ) last_data_modified_time = serializers.JSONNodeField( "lastModifiedTime", parse_callback=datetime.fromtimestamp, set_to_parent=True ) last_meta_modified_time = serializers.JSONNodeField( "lastDDLTime", parse_callback=datetime.fromtimestamp, set_to_parent=True ) is_virtual_view = serializers.JSONNodeField( "isVirtualView", parse_callback=bool, set_to_parent=True ) is_materialized_view = serializers.JSONNodeField( "isMaterializedView", parse_callback=bool, set_to_parent=True ) is_materialized_view_rewrite_enabled = serializers.JSONNodeField( "isMaterializedViewRewriteEnabled", parse_callback=lambda x: x is not None and str(x).lower() == "true", set_to_parent=True, ) is_materialized_view_outdated = serializers.JSONNodeField( "isMaterializedViewOutdated", parse_callback=lambda x: x is not None and str(x).lower() == "true", set_to_parent=True, ) lifecycle = serializers.JSONNodeField( "lifecycle", parse_callback=int, set_to_parent=True ) view_text = serializers.JSONNodeField("viewText", set_to_parent=True) view_expanded_text = serializers.JSONNodeField( "viewExpandedText", set_to_parent=True ) size = serializers.JSONNodeField("size", parse_callback=int, set_to_parent=True) is_archived = serializers.JSONNodeField( "IsArchived", parse_callback=bool, set_to_parent=True ) physical_size = serializers.JSONNodeField( "PhysicalSize", parse_callback=int, set_to_parent=True ) file_num = serializers.JSONNodeField( "FileNum", parse_callback=int, set_to_parent=True ) record_num = serializers.JSONNodeField( "recordNum", parse_callback=int, set_to_parent=True ) location = serializers.JSONNodeField("location", set_to_parent=True) storage_handler = serializers.JSONNodeField("storageHandler", set_to_parent=True) resources = serializers.JSONNodeField("resources", set_to_parent=True) serde_properties = serializers.JSONNodeField( "serDeProperties", type="json", set_to_parent=True ) table_properties = serializers.JSONNodeField( "tableProperties", type="json", set_to_parent=True ) reserved = serializers.JSONNodeField("Reserved", type="json", set_to_parent=True) shard = serializers.JSONNodeReferenceField( Shard, "shardInfo", check_before=["shardExist"], set_to_parent=True ) table_label = serializers.JSONNodeField( "tableLabel", callback=lambda t: t if t != "0" else "", set_to_parent=True ) _columns = serializers.JSONNodesReferencesField(TableColumn, "columns") _partitions = serializers.JSONNodesReferencesField(TablePartition, "partitionKeys") def __getstate__(self): return self._columns, self._partitions def __setstate__(self, state): columns, partitions = state self.__init__(columns=columns, partitions=partitions) def __dir__(self): return sorted(set(dir2(self)) - set(type(self)._parent_attrs)) class Table(LazyLoad): """ Table means the same to the RDBMS table, besides, a table can consist of partitions. Table's properties are the same to the ones of :class:`odps.models.Project`, which will not load from remote ODPS service until users try to get them. In order to write data into table, users should call the ``open_writer`` method with **with statement**. At the same time, the ``open_reader`` method is used to provide the ability to read records from a table or its partition. :Example: >>> table = odps.get_table('my_table') >>> table.owner # first will load from remote >>> table.reload() # reload to update the properties >>> >>> for record in table.head(5): >>> # check the first 5 records >>> for record in table.head(5, partition='pt=test', columns=['my_column']) >>> # only check the `my_column` column from certain partition of this table >>> >>> with table.open_reader() as reader: >>> count = reader.count # How many records of a table or its partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times >>> >>> with table.open_writer() as writer: >>> writer.write(records) >>> with table.open_writer(partition='pt=test', blocks=[0, 1]): >>> writer.write(0, gen_records(block=0)) >>> writer.write(1, gen_records(block=1)) # we can do this parallel """ _extended_args = ( "is_archived", "physical_size", "file_num", "location", "schema_version", "storage_handler", "resources", "serde_properties", "serde_info", "input_format", "output_format", "stored_as", "reserved", "is_transactional", "primary_key", "storage_tier_info", "cluster_info", "acid_data_retain_hours", "cdc_size", "cdc_record_num", "cdc_latest_version", "cdc_latest_timestamp", ) __slots__ = ( "_is_extend_info_loaded", "last_meta_modified_time", "is_virtual_view", "is_materialized_view", "is_materialized_view_rewrite_enabled", "is_materialized_view_outdated", "lifecycle", "view_text", "view_expanded_text", "size", "shard", "record_num", "table_properties", "_table_tunnel", "_id_thread_local", ) __slots__ += _extended_args _extended_args = set(_extended_args) class Type(Enum): MANAGED_TABLE = "MANAGED_TABLE" EXTERNAL_TABLE = "EXTERNAL_TABLE" OBJECT_TABLE = "OBJECT_TABLE" VIRTUAL_VIEW = "VIRTUAL_VIEW" MATERIALIZED_VIEW = "MATERIALIZED_VIEW" name = serializers.XMLNodeField("Name") table_id = serializers.XMLNodeField("TableId") format = serializers.XMLNodeAttributeField(attr="format") table_schema = serializers.XMLNodeReferenceField(TableSchema, "Schema") comment = serializers.XMLNodeField("Comment") owner = serializers.XMLNodeField("Owner") table_label = serializers.XMLNodeField("TableLabel") creation_time = serializers.XMLNodeField( "CreationTime", parse_callback=utils.parse_rfc822 ) last_data_modified_time = serializers.XMLNodeField( "LastModifiedTime", parse_callback=utils.parse_rfc822 ) last_access_time = serializers.XMLNodeField( "LastAccessTime", parse_callback=utils.parse_rfc822 ) type = serializers.XMLNodeField( "Type", parse_callback=lambda s: Table.Type(s.upper()) if s is not None else None, ) _download_ids = utils.thread_local_attribute("_id_thread_local", dict) _upload_ids = utils.thread_local_attribute("_id_thread_local", dict) def __init__(self, **kwargs): self._is_extend_info_loaded = False if "schema" in kwargs: warnings.warn( "Argument schema is deprecated and will be replaced by table_schema.", DeprecationWarning, stacklevel=2, ) kwargs["table_schema"] = kwargs.pop("schema") super(Table, self).__init__(**kwargs) try: del self._id_thread_local except AttributeError: pass def table_resource(self, client=None, endpoint=None, force_schema=False): schema_name = self._get_schema_name() if force_schema: schema_name = schema_name or "default" if schema_name is None: return self.resource(client=client, endpoint=endpoint) return "/".join( [ self.project.resource(client, endpoint=endpoint), "schemas", schema_name, "tables", self.name, ] ) @property def full_table_name(self): schema_name = self._get_schema_name() if schema_name is None: return "{0}.`{1}`".format(self.project.name, self.name) else: return "{0}.{1}.`{2}`".format(self.project.name, schema_name, self.name) def reload(self): url = self.resource() resp = self._client.get(url, curr_schema=self._get_schema_name()) self.parse(self._client, resp, obj=self) self.table_schema.load() self._loaded = True def reset(self): super(Table, self).reset() self._is_extend_info_loaded = False self.table_schema = None @property def schema(self): warnings.warn( "Table.schema is deprecated and will be replaced by Table.table_schema.", DeprecationWarning, stacklevel=3, ) utils.add_survey_call( ".".join([type(self).__module__, type(self).__name__, "schema"]) ) return self.table_schema @property def last_modified_time(self): warnings.warn( "Table.last_modified_time is deprecated and will be replaced by " "Table.last_data_modified_time.", DeprecationWarning, stacklevel=3, ) utils.add_survey_call( ".".join([type(self).__module__, type(self).__name__, "last_modified_time"]) ) return self.last_data_modified_time def _parse_reserved(self): if not self.reserved: self.schema_version = None self.is_transactional = None self.primary_key = None self.storage_tier_info = None self.cluster_info = None self.acid_data_retain_hours = -1 self.cdc_size = -1 self.cdc_record_num = -1 self.cdc_latest_version = -1 self.cdc_latest_timestamp = None self.serde_info = None self.input_format = None self.output_format = None self.stored_as = None return self.schema_version = self.reserved.get("schema_version") is_transactional = self.reserved.get("Transactional") self.is_transactional = ( is_transactional is not None and is_transactional.lower() == "true" ) self.primary_key = self.reserved.get("PrimaryKey") self.storage_tier_info = StorageTierInfo.deserial(self.reserved) self.cluster_info = ClusterInfo.deserial(self.reserved) self.acid_data_retain_hours = int( self.reserved.get("acid.data.retain.hours", "-1") ) self.cdc_size = int(self.reserved.get("cdc_size", "-1")) self.cdc_record_num = int(self.reserved.get("cdc_record_num", "-1")) self.cdc_latest_version = int(self.reserved.get("cdc_latest_version", "-1")) self.cdc_latest_timestamp = None if "cdc_latest_timestamp" in self.reserved: self.cdc_latest_timestamp = datetime.fromtimestamp( int(self.reserved["cdc_latest_timestamp"]) ) self.serde_info = self.reserved.get("SerDeInfo") self.input_format = self.reserved.get("InputFormat") self.output_format = self.reserved.get("OutputFormat") self.stored_as = self.reserved.get("StoredAs") def reload_extend_info(self): params = {} schema_name = self._get_schema_name() if schema_name is not None: params["curr_schema"] = schema_name resp = self._client.get(self.resource(), action="extended", params=params) self.parse(self._client, resp, obj=self) self._is_extend_info_loaded = True if not self._loaded: self.table_schema = None self._parse_reserved() def __getattribute__(self, attr): if attr in type(self)._extended_args: if not self._is_extend_info_loaded: self.reload_extend_info() if attr == "record_num" and self.table_schema.partitions: warnings.warn( "record_name nay not be correct when table has partitions. Use " "aggregated value of required partitions instead.", RuntimeWarning, stacklevel=3, ) return object.__getattribute__(self, attr) val = object.__getattribute__(self, attr) if val is None and not self._loaded: if attr in getattr(TableSchema, "__fields"): self.reload() return object.__getattribute__(self, attr) return super(Table, self).__getattribute__(attr) def _get_column_generate_expression(self, column_name): if self.table_schema[column_name].generate_expression is not None: return self.table_schema[column_name].generate_expression else: table_props = self.table_properties or {} if column_name.lower() == "_partitiontime" and utils.str_to_bool( table_props.get("ingestion_time_partition") ): col_expr = ( '[{"functionCall":{"name":"current_timestamp_ntz","type":"%s"}}]' % str(self.table_schema[column_name].type) ) return parse_expression(col_expr) return None def _repr(self): buf = six.StringIO() buf.write("odps.Table\n") buf.write(" name: {0}\n".format(self.full_table_name)) if self.type: buf.write(" type: {0}\n".format(self.type.value)) max_name_len = max(len(col.name) for col in self.table_schema.columns) name_space = max_name_len + min(max_name_len, 16) max_type_len = max(len(repr(col.type)) for col in self.table_schema.columns) type_space = max_type_len + min(max_type_len, 16) not_empty = lambda field: field is not None and len(field.strip()) > 0 buf.write(" schema:\n") cols_strs = [] for col in self.table_schema._columns: cols_strs.append( "{0}: {1}{2}".format( col.name.ljust(name_space), repr(col.type).ljust(type_space), "# {0}".format(utils.to_str(col.comment)) if not_empty(col.comment) else "", ).strip() ) buf.write(utils.indent("\n".join(cols_strs), 4)) buf.write("\n") if self.table_schema.partitions: buf.write(" partitions:\n") partition_strs = [] for partition in self.table_schema.partitions: partition_strs.append( "{0}: {1}{2}".format( partition.name.ljust(name_space), repr(partition.type).ljust(type_space), "# {0}".format(utils.to_str(partition.comment)) if not_empty(partition.comment) else "", ).strip() ) buf.write(utils.indent("\n".join(partition_strs), 4)) if self.view_text: buf.write(" view text:\n{0}".format(utils.indent(self.view_text, 4))) return buf.getvalue() @classmethod def gen_create_table_sql( cls, table_name, table_schema, comment=None, if_not_exists=False, lifecycle=None, shard_num=None, hub_lifecycle=None, with_column_comments=True, transactional=False, primary_key=None, storage_tier=None, project=None, schema=None, table_type=None, view_text=None, **kw ): buf = six.StringIO() table_name = utils.to_text(table_name) project = utils.to_text(project) schema = utils.to_text(schema) comment = utils.to_text(comment) view_text = utils.to_text(view_text) table_type = cls.Type(table_type or cls.Type.MANAGED_TABLE) is_view = table_type in (cls.Type.VIRTUAL_VIEW, cls.Type.MATERIALIZED_VIEW) primary_key = ( [primary_key] if isinstance(primary_key, six.string_types) else primary_key ) stored_as = kw.get("stored_as") or kw.get("external_stored_as") storage_handler = kw.get("storage_handler") location = kw.get("location") row_format_serde = kw.get("row_format_serde") input_format = kw.get("input_format") output_format = kw.get("output_format") table_properties = kw.get("table_properties") or {} cluster_info = kw.get("cluster_info") use_auto_partitioning = kw.get("use_auto_partitioning") rewrite_enabled = kw.get("rewrite_enabled") rewrite_enabled = rewrite_enabled if rewrite_enabled is not None else True if table_type == cls.Type.EXTERNAL_TABLE: type_str = u"EXTERNAL TABLE" elif table_type == cls.Type.VIRTUAL_VIEW: type_str = u"VIEW" elif table_type == cls.Type.MATERIALIZED_VIEW: type_str = u"MATERIALIZED VIEW" else: type_str = u"EXTERNAL TABLE" if location else u"TABLE" buf.write(u"CREATE %s " % type_str) if if_not_exists: buf.write(u"IF NOT EXISTS ") if project is not None: buf.write(u"%s." % project) if schema is not None: buf.write(u"%s." % schema) buf.write(u"`%s` " % table_name) if is_view and lifecycle is not None and lifecycle > 0: buf.write("LIFECYCLE %s " % lifecycle) def _write_primary_key(prev=""): if not primary_key: return if not prev.strip().endswith(","): buf.write(u",\n") buf.write( u" PRIMARY KEY (%s)" % ", ".join("`%s`" % c for c in primary_key) ) if isinstance(table_schema, six.string_types): buf.write(u"(\n") buf.write(table_schema) _write_primary_key(table_schema) buf.write(u"\n)\n") if comment: buf.write(u"COMMENT '%s'\n" % utils.escape_odps_string(comment)) elif isinstance(table_schema, tuple): buf.write(u"(\n") buf.write(table_schema[0]) _write_primary_key(table_schema[0]) buf.write(u"\n)\n") if comment: buf.write(u"COMMENT '%s'\n" % utils.escape_odps_string(comment)) if len(table_schema) > 1 and table_schema[1]: if ( use_auto_partitioning is None and _auto_part_regex.search(table_schema[1]) ) or use_auto_partitioning: buf.write(u"AUTO \n") buf.write(u"PARTITIONED BY ") buf.write(u"(\n") buf.write(table_schema[1]) buf.write(u"\n)\n") else: def write_columns(col_array, with_pk=False): size = len(col_array) buf.write(u"(\n") for idx, column in enumerate(col_array): buf.write(column.to_sql_clause(with_column_comments)) if idx < size - 1: buf.write(u",\n") if with_pk: _write_primary_key() buf.write(u"\n)\n") def write_view_columns(col_array): size = len(col_array) buf.write(u"(\n") for idx, column in enumerate(col_array): buf.write(u" `%s`" % (utils.to_text(column.name))) if with_column_comments and column.comment: comment_str = utils.escape_odps_string( utils.to_text(column.comment) ) buf.write(u" COMMENT '%s'" % comment_str) if idx < size - 1: buf.write(u",\n") buf.write(u"\n)\n") if not is_view: write_columns(table_schema.simple_columns, with_pk=True) else: write_view_columns(table_schema.simple_columns) if comment: comment_str = utils.escape_odps_string(utils.to_text(comment)) buf.write(u"COMMENT '%s'\n" % comment_str) if table_type == cls.Type.MATERIALIZED_VIEW and not rewrite_enabled: buf.write(u"DISABLE REWRITE\n") manual_parts, auto_parts = [], [] for p in table_schema.partitions or []: if p.generate_expression is None: manual_parts.append(p) else: auto_parts.append(p) if manual_parts and auto_parts: raise ValueError( "Auto partitioning and manual partitioning can not coexist" ) if manual_parts: if not is_view: buf.write(u"PARTITIONED BY ") write_columns(manual_parts) else: buf.write(u"PARTITIONED ON ") write_view_columns(manual_parts) if auto_parts: buf.write(u"AUTO PARTITIONED BY ") write_columns(auto_parts) if cluster_info is not None: buf.write(cluster_info.to_sql_clause()) buf.write(u"\n") if transactional: table_properties["transactional"] = "true" if storage_tier: if isinstance(storage_tier, six.string_types): storage_tier = StorageTier( utils.underline_to_camel(storage_tier).lower() ) table_properties["storagetier"] = storage_tier.value if table_properties: buf.write(u"TBLPROPERTIES (\n") for idx, (k, v) in enumerate( sorted(table_properties.items(), key=lambda x: x[0]) ): k = utils.escape_odps_string(utils.to_text(k)) if isinstance(v, bool): v = "true" if v else "false" v = utils.escape_odps_string(utils.to_text(v)) buf.write(u" '%s' = '%s'" % (k, v)) if idx + 1 < len(table_properties): buf.write(u",") buf.write(u"\n") buf.write(u")\n") serde_properties = kw.get("serde_properties") resources = kw.get("resources") if location or stored_as: if storage_handler: buf.write( u"STORED BY '%s'\n" % utils.escape_odps_string(storage_handler) ) elif row_format_serde: buf.write( u"ROW FORMAT SERDE '%s'\n" % utils.escape_odps_string(row_format_serde) ) if serde_properties: buf.write(u"WITH SERDEPROPERTIES (\n") for idx, (k, v) in enumerate( sorted(serde_properties.items(), key=lambda x: x[0]) ): k = utils.escape_odps_string(utils.to_text(k)) if isinstance(v, bool): v = "true" if v else "false" v = utils.escape_odps_string(utils.to_text(v)) buf.write(u" '%s' = '%s'" % (k, v)) if idx + 1 < len(serde_properties): buf.write(u",") buf.write(u"\n") buf.write(u")\n") if stored_as or input_format or output_format: if not stored_as: stored_as = u"\nINPUTFORMAT '%s'\nOUTPUTFORMAT '%s'" % ( utils.escape_odps_string(input_format), utils.escape_odps_string(output_format), ) buf.write(u"STORED AS %s\n" % stored_as) if location: buf.write(u"LOCATION '%s'\n" % utils.escape_odps_string(location)) if resources: buf.write(u"USING '%s'\n" % utils.escape_odps_string(resources)) if not is_view and lifecycle is not None and lifecycle > 0: buf.write(u"LIFECYCLE %s\n" % lifecycle) if shard_num is not None: buf.write(u"INTO %s SHARDS" % shard_num) if hub_lifecycle is not None: buf.write(u" HUBLIFECYCLE %s\n" % hub_lifecycle) else: buf.write(u"\n") if is_view and view_text: buf.write(u"AS %s\n" % view_text) return buf.getvalue().strip() def get_ddl(self, with_comments=True, if_not_exists=False, force_table_ddl=False): """ Get DDL SQL statement for the given table. :param with_comments: append comment for table and each column :param if_not_exists: generate `if not exists` code for generated DDL :param force_table_ddl: force generate table DDL if object is a view :return: DDL statement """ shard_num = self.shard.shard_num if self.shard is not None else None storage_tier = ( self.storage_tier_info.storage_tier.value if self.storage_tier_info else None ) table_type = self.type if not force_table_ddl else self.Type.MANAGED_TABLE if self.stored_as and self.stored_as.lower() not in ("cfile", "unknown"): stored_as = self.stored_as row_format_serde = None input_format = output_format = None else: # use system-designated serde to simplify ddl stored_as = None row_format_serde = (self.serde_info or {}).get("serializationLib") input_format = self.input_format output_format = self.output_format # ignore default serde if not self.location and (row_format_serde, input_format, output_format) == ( "com.aliyun.apsara.serde.CFileSerDe", "com.aliyun.apsara.format.CFileInputFormat", "com.aliyun.apsara.format.CFileOutputFormat", ): row_format_serde = input_format = output_format = None return self.gen_create_table_sql( self.name, self.table_schema, self.comment if with_comments else None, if_not_exists=if_not_exists, with_column_comments=with_comments, lifecycle=self.lifecycle, shard_num=shard_num, project=self.project.name, storage_handler=self.storage_handler, serde_properties=self.serde_properties, table_properties=self.table_properties, stored_as=stored_as, row_format_serde=row_format_serde, input_format=input_format, output_format=output_format, location=self.location, resources=self.resources, table_type=table_type, storage_tier=storage_tier, cluster_info=self.cluster_info, transactional=self.is_transactional, primary_key=self.primary_key, view_text=self.view_text, rewrite_enabled=self.is_materialized_view_rewrite_enabled, ) def _build_partition_spec_sql(self, partition_spec=None): partition_expr = "" if partition_spec is not None: if not isinstance(partition_spec, (list, tuple)): partition_spec = [partition_spec] partition_spec = [odps_types.PartitionSpec(spec) for spec in partition_spec] partition_expr = " " + ", ".join( "PARTITION (%s)" % spec for spec in partition_spec ) # as data of partition changed, remove existing download id to avoid TableModified error for part in partition_spec or [None]: if isinstance(part, six.string_types): part = odps_types.PartitionSpec(part) self._download_ids.pop(part, None) return partition_expr def _build_alter_table_ddl(self, action=None, partition_spec=None, cmd=u"ALTER"): action = action or "" target = u"TABLE" if self.type in (Table.Type.VIRTUAL_VIEW, Table.Type.MATERIALIZED_VIEW): target = u"VIEW" partition_expr = self._build_partition_spec_sql(partition_spec) sql = u"%s %s %s%s %s" % ( cmd, target, self.full_table_name, partition_expr, action, ) return sql.strip() @utils.survey def _head_by_data(self, limit, partition=None, columns=None, timeout=None): if limit <= 0: raise ValueError("limit number should >= 0.") params = {"linenum": limit} if partition is not None: if not isinstance(partition, odps_types.PartitionSpec): partition = odps_types.PartitionSpec(partition) params["partition"] = str(partition) if columns is not None and len(columns) > 0: col_name = ( lambda col: col.name if isinstance(col, odps_types.Column) else col ) params["cols"] = ",".join(col_name(col) for col in columns) schema_name = self._get_schema_name() if schema_name is not None: params["schema_name"] = schema_name resp = self._client.get( self.resource(), action="data", params=params, stream=True, timeout=timeout ) return readers.CsvRecordReader( self.table_schema, resp, max_field_size=self._get_max_field_size() ) def _head_by_preview( self, limit, partition=None, columns=None, compress_algo=None, timeout=None, tags=None, ): table_tunnel = self._create_table_tunnel() return table_tunnel.open_preview_reader( self, partition_spec=partition, columns=columns, limit=limit, compress_algo=compress_algo, arrow=False, timeout=timeout, read_all=True, tags=tags, ) def head( self, limit, partition=None, columns=None, use_legacy=True, timeout=None, tags=None, ): """ Get the head records of a table or its partition. :param int limit: records' size, 10000 at most :param partition: partition of this table :param list columns: the columns which is subset of the table columns :return: records :rtype: list .. seealso:: :class:`odps.models.Record` """ try: if pa is not None and not use_legacy: timeout = ( timeout if timeout is not None else options.tunnel.legacy_fallback_timeout ) return self._head_by_preview( limit, partition=partition, columns=columns, timeout=timeout, tags=tags, ) except: # only raises when under tests and # use_legacy specified explicitly as False if use_legacy is False: raise return self._head_by_data( limit, partition=partition, columns=columns, timeout=timeout ) def _create_table_tunnel(self, endpoint=None, quota_name=None): if self._table_tunnel is not None: return self._table_tunnel from ..tunnel import TableTunnel self._table_tunnel = TableTunnel( client=self._client, project=self.project, endpoint=endpoint or self.project._tunnel_endpoint, quota_name=quota_name, ) return self._table_tunnel def open_reader( self, partition=None, reopen=False, endpoint=None, download_id=None, timeout=None, arrow=False, columns=None, quota_name=None, async_mode=True, append_partitions=None, tags=None, **kw ): """ Open the reader to read the entire records from this table or its partition. :param partition: partition of this table :param reopen: the reader will reuse last one, reopen is true means open a new reader. :type reopen: bool :param endpoint: the tunnel service URL :param download_id: use existing download_id to download table contents :param arrow: use arrow tunnel to read data :param columns: columns to read :param quota_name: name of tunnel quota :param async_mode: enable async mode to create tunnels, can set True if session creation takes a long time. :param compress_option: compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :param compress_algo: compression algorithm, work when ``compress_option`` is not provided, can be ``zlib``, ``snappy`` :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided :param bool append_partitions: if True, partition values will be appended to the output :return: reader, ``count`` means the full size, ``status`` means the tunnel status :Example: >>> with table.open_reader() as reader: >>> count = reader.count # How many records of a table or its partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times """ from ..tunnel.tabletunnel import TableDownloadSession if self.is_transactional and self.primary_key: # currently acid 2.0 table can only be read through select statement sql_stmt = "SELECT * FROM %s" % self.full_table_name if partition is not None: part_spec = odps_types.PartitionSpec(partition) conds = " AND ".join( "%s='%s'" % (k, utils.escape_odps_string(v)) for k, v in part_spec.items() ) sql_stmt += " WHERE " + conds return self.project.odps.execute_sql(sql_stmt).open_reader() if partition and not isinstance(partition, odps_types.PartitionSpec): partition = odps_types.PartitionSpec(partition) tunnel = self._create_table_tunnel(endpoint=endpoint, quota_name=quota_name) download_ids = dict() if download_id is None: download_ids = self._download_ids download_id = download_ids.get(partition) if not reopen else None download_session = utils.call_with_retry( tunnel.create_download_session, table=self, partition_spec=partition, download_id=download_id, timeout=timeout, async_mode=async_mode, tags=tags, **kw ) if ( download_id and download_session.status != TableDownloadSession.Status.Normal ): download_session = utils.call_with_retry( tunnel.create_download_session, table=self, partition_spec=partition, timeout=timeout, async_mode=async_mode, tags=tags, **kw ) download_ids[partition] = download_session.id reader_cls = TableArrowReader if arrow else TableRecordReader kw = ( {"append_partitions": append_partitions} if append_partitions is not None else {} ) return reader_cls(self, download_session, partition, columns=columns, **kw) def open_writer( self, partition=None, blocks=None, reopen=False, create_partition=False, commit=True, endpoint=None, upload_id=None, arrow=False, quota_name=None, tags=None, mp_context=None, **kw ): """ Open the writer to write records into this table or its partition. :param partition: partition of this table :param blocks: block ids to open :param bool reopen: the reader will reuse last one, reopen is true means open a new reader. :param bool create_partition: if true, the partition will be created if not exist :param endpoint: the tunnel service URL :param upload_id: use existing upload_id to upload data :param arrow: use arrow tunnel to write data :param quota_name: name of tunnel quota :param bool overwrite: if True, will overwrite existing data :param compress_option: compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :param compress_algo: compression algorithm, work when ``compress_option`` is not provided, can be ``zlib``, ``snappy`` :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided :return: writer, status means the tunnel writer status :Example: >>> with table.open_writer() as writer: >>> writer.write(records) >>> with table.open_writer(partition='pt=test', blocks=[0, 1]): >>> writer.write(0, gen_records(block=0)) >>> writer.write(1, gen_records(block=1)) # we can do this parallel """ from ..tunnel.tabletunnel import TableUploadSession if partition and not isinstance(partition, odps_types.PartitionSpec): partition = odps_types.PartitionSpec(partition) if create_partition and not self.exist_partition(create_partition): self.create_partition(partition, if_not_exists=True) tunnel = self._create_table_tunnel(endpoint=endpoint, quota_name=quota_name) if upload_id is None: upload_ids = self._upload_ids upload_id = upload_ids.get(partition) if not reopen else None else: upload_ids = dict() use_upsert = self.is_transactional and self.primary_key def _create_session(upload_id_): if use_upsert: return tunnel.create_upsert_session( table=self, partition_spec=partition, upsert_id=upload_id_, tags=tags, **kw ) else: return tunnel.create_upload_session( table=self, partition_spec=partition, upload_id=upload_id, tags=tags, **kw ) upload_session = utils.call_with_retry(_create_session, upload_id) if ( upload_id and upload_session.status.value != TableUploadSession.Status.Normal.value ): # check upload session status upload_session = utils.call_with_retry(_create_session, None) upload_ids[partition] = upload_session.id # as data of partition changed, remove existing download id to avoid TableModified error self._download_ids.pop(partition, None) if arrow: writer_cls = TableArrowWriter elif use_upsert: writer_cls = TableUpsertWriter else: writer_cls = TableRecordWriter def _writer_on_close(): if commit: upload_ids[partition] = None return writer_cls( self, upload_session, blocks=blocks, commit=commit, on_close=_writer_on_close, mp_context=mp_context, ) def to_pandas( self, partition=None, columns=None, start=None, count=None, n_process=1, quota_name=None, append_partitions=None, tags=None, **kwargs ): """ Read table data into pandas DataFrame :param partition: partition of this table :param list columns: columns to read :param int start: start row index from 0 :param int count: data count to read :param int n_process: number of processes to accelerate reading :param bool append_partitions: if True, partition values will be appended to the output :param str quota_name: name of tunnel quota to use """ if partition is None and self.table_schema.partitions: raise ValueError( "You must specify a partition when calling to_pandas on a partitioned table" ) arrow = kwargs.pop("arrow", True) with self.open_reader( partition=partition, columns=columns, arrow=arrow, quota_name=quota_name, append_partitions=append_partitions, tags=tags, **kwargs ) as reader: return reader.to_pandas(start=start, count=count, n_process=n_process) def iter_pandas( self, partition=None, columns=None, batch_size=None, start=None, count=None, quota_name=None, append_partitions=None, tags=None, **kwargs ): """ Iterate table data in blocks as pandas DataFrame :param partition: partition of this table :param list columns: columns to read :param int batch_size: size of DataFrame batch to read :param int start: start row index from 0 :param int count: data count to read :param bool append_partitions: if True, partition values will be appended to the output :param str quota_name: name of tunnel quota to use """ if partition is None and self.table_schema.partitions: raise ValueError( "You must specify a partition when calling to_pandas on a partitioned table" ) kwargs.pop("arrow", None) with self.open_reader( partition=partition, columns=columns, arrow=True, quota_name=quota_name, append_partitions=append_partitions, tags=tags, **kwargs ) as reader: for batch in reader.iter_pandas( batch_size, start=start, count=count, columns=columns ): yield batch @property def partitions(self): return Partitions(parent=self, client=self._client) @utils.with_wait_argument def create_partition( self, partition_spec, if_not_exists=False, async_=False, hints=None ): """ Create a partition within the table. :param partition_spec: specification of the partition. :param if_not_exists: :param hints: :param async_: :return: partition object :rtype: odps.models.partition.Partition """ return self.partitions.create( partition_spec, if_not_exists=if_not_exists, hints=hints, async_=async_ ) @utils.with_wait_argument def delete_partition( self, partition_spec, if_exists=False, async_=False, hints=None ): """ Delete a partition within the table. :param partition_spec: specification of the partition. :param if_exists: :param hints: :param async_: """ return self.partitions.delete( partition_spec, if_exists=if_exists, hints=hints, async_=async_ ) def exist_partition(self, partition_spec): """ Check if a partition exists within the table. :param partition_spec: specification of the partition. """ return partition_spec in self.partitions def exist_partitions(self, prefix_spec=None): """ Check if partitions with provided conditions exist. :param prefix_spec: prefix of partition :return: whether partitions exist """ try: next(self.partitions.iterate_partitions(spec=prefix_spec)) except StopIteration: return False return True def iterate_partitions(self, spec=None, reverse=False): """ Create an iterable object to iterate over partitions. :param spec: specification of the partition. :param reverse: output partitions in reversed order """ return self.partitions.iterate_partitions(spec=spec, reverse=reverse) def get_partition(self, partition_spec): """ Get a partition with given specifications. :param partition_spec: specification of the partition. :return: partition object :rtype: odps.models.partition.Partition """ return self.partitions[partition_spec] def get_max_partition(self, spec=None, skip_empty=True, reverse=False): """ Get partition with maximal values within certain spec. :param spec: parent partitions. if specified, will return partition with maximal value within specified parent partition :param skip_empty: if True, will skip partitions without data :param reverse: if True, will return minimal value :return: Partition """ if not self.table_schema.partitions: raise ValueError("Table %r not partitioned" % self.name) return self.partitions.get_max_partition( spec, skip_empty=skip_empty, reverse=reverse ) def _unload_if_async(self, async_=False, reload=True): if async_: self._loaded = False elif reload: self.reload() @utils.with_wait_argument def truncate(self, partition_spec=None, async_=False, hints=None): """ truncate this table. :param partition_spec: partition specs :param hints: :param async_: run asynchronously if True :return: None """ sql = self._build_alter_table_ddl(partition_spec=partition_spec, cmd="TRUNCATE") inst = self.parent._run_table_sql( sql, task_name="SQLTruncateTableTask", hints=hints, wait=not async_ ) self._unload_if_async() return inst @utils.with_wait_argument def drop(self, async_=False, if_exists=False, hints=None): """ Drop this table. :param async_: run asynchronously if True :param if_exists: :param hints: :return: None """ return self.parent.delete(self, async_=async_, if_exists=if_exists, hints=hints) @utils.with_wait_argument def set_storage_tier( self, storage_tier, partition_spec=None, async_=False, hints=None ): """ Set storage tier of current table or specific partition. """ self._is_extend_info_loaded = False if isinstance(storage_tier, six.string_types): storage_tier = StorageTier(utils.underline_to_camel(storage_tier).lower()) property_item = "TBLPROPERTIES" if not partition_spec else "PARTITIONPROPERTIES" sql = self._build_alter_table_ddl( "SET %s('storagetier'='%s')" % (property_item, storage_tier.value), partition_spec=partition_spec, ) hints = hints or {} hints["odps.tiered.storage.enable"] = "true" inst = self.parent._run_table_sql( sql, task_name="SQLSetStorageTierTask", hints=hints, wait=not async_ ) self.storage_tier_info = storage_tier self._unload_if_async(async_=async_, reload=False) return inst @utils.with_wait_argument def add_columns(self, columns, if_not_exists=False, async_=False, hints=None): """ Add columns to the table. :param columns: columns to add, can be a list of :class:`~odps.types.Column` or a string of column definitions :param if_not_exists: if True, will not raise exception when column exists :Example: >>> table = odps.create_table('test_table', schema=TableSchema.from_lists(['name', 'id'], ['sring', 'string'])) >>> # add column by Column instance >>> table.add_columns([Column('id2', 'string')]) >>> # add column by a string of column definitions >>> table.add_columns("fid double, fid2 double") """ if isinstance(columns, odps_types.Column): columns = [columns] action_str = u"ADD COLUMNS" + ( u" IF NOT EXISTS (\n" if if_not_exists else u" (\n" ) if isinstance(columns, six.string_types): action_str += columns + "\n)" else: action_str += ( u",\n".join([" " + col.to_sql_clause() for col in columns]) + u"\n)" ) sql = self._build_alter_table_ddl(action_str) inst = self.parent._run_table_sql( sql, task_name="SQLAddColumnsTask", hints=hints, wait=not async_ ) self._unload_if_async(async_=async_) return inst @utils.with_wait_argument def delete_columns(self, columns, async_=False, hints=None): """ Delete columns from the table. :param columns: columns to delete, can be a list of column names """ if isinstance(columns, six.string_types): columns = [columns] action_str = u"DROP COLUMNS " + u", ".join(u"`%s`" % c for c in columns) sql = self._build_alter_table_ddl(action_str) inst = self.parent._run_table_sql( sql, task_name="SQLDeleteColumnsTask", hints=hints, wait=not async_ ) self._unload_if_async(async_=async_) return inst @utils.with_wait_argument def rename_column( self, old_column_name, new_column_name, comment=None, async_=False, hints=None ): """ Rename a column in the table. :param old_column_name: old column name :param new_column_name: new column name :param comment: new column comment, optional """ if comment: old_col = self.table_schema[old_column_name] new_col = odps_types.Column( name=new_column_name, type=old_col.type, comment=comment, label=old_col.label, nullable=old_col.nullable, ) action_str = u"CHANGE COLUMN %s %s" % ( old_column_name, new_col.to_sql_clause(), ) else: action_str = u"CHANGE COLUMN %s RENAME TO %s" % ( old_column_name, new_column_name, ) sql = self._build_alter_table_ddl(action_str) inst = self.parent._run_table_sql( sql, task_name="SQLRenameColumnsTask", hints=hints, wait=not async_ ) self._unload_if_async(async_=async_) return inst @utils.with_wait_argument def set_lifecycle(self, days, async_=False, hints=None): """ Set lifecycle of current table. :param days: lifecycle in days """ sql = self._build_alter_table_ddl(u"SET LIFECYCLE %s" % days) inst = self.parent._run_table_sql( sql, task_name="SQLSetLifecycleTask", hints=hints, wait=not async_ ) self.lifecycle = days self._unload_if_async(async_=async_, reload=False) return inst @utils.with_wait_argument def set_owner(self, new_owner, async_=False, hints=None): """ Set owner of current table. :param new_owner: account of the new owner """ sql = self._build_alter_table_ddl( u"CHANGEOWNER TO '%s'" % utils.escape_odps_string(new_owner) ) inst = self.parent._run_table_sql( sql, task_name="SQLSetOwnerTask", hints=hints, wait=not async_ ) self.owner = new_owner self._unload_if_async(async_=async_, reload=False) return inst @utils.with_wait_argument def set_comment(self, new_comment, async_=False, hints=None): """ Set comment of current table. :param new_comment: new comment """ sql = self._build_alter_table_ddl( u"SET COMMENT '%s'" % utils.escape_odps_string(new_comment) ) inst = self.parent._run_table_sql( sql, task_name="SQLSetCommentTask", hints=hints, wait=not async_ ) self.comment = new_comment self._unload_if_async(async_=async_, reload=False) return inst @utils.with_wait_argument def set_cluster_info(self, new_cluster_info, async_=False, hints=None): """ Set cluster info of current table. """ if new_cluster_info is None: action = u"NOT CLUSTERED" else: assert isinstance(new_cluster_info, ClusterInfo) action = new_cluster_info.to_sql_clause() sql = self._build_alter_table_ddl(action) inst = self.parent._run_table_sql( sql, task_name="SQLSetClusterInfoTask", hints=hints, wait=not async_ ) self.cluster_info = new_cluster_info self._unload_if_async(async_=async_, reload=False) return inst @utils.with_wait_argument def rename(self, new_name, async_=False, hints=None): """ Rename the table. :param new_name: new table name """ sql = self._build_alter_table_ddl("RENAME TO `%s`" % new_name) inst = self.parent._run_table_sql( sql, task_name="SQLRenameTask", hints=hints, wait=not async_ ) self.name = new_name del self.parent[self.name] self._unload_if_async(async_=async_) return inst @utils.with_wait_argument def change_partition_spec( self, old_partition_spec, new_partition_spec, async_=False, hints=None ): """ Change partition spec of specified partition of the table. :param old_partition_spec: old partition spec :param new_partition_spec: new partition spec """ sql = self._build_alter_table_ddl( "RENAME TO %s" % self._build_partition_spec_sql(new_partition_spec), partition_spec=old_partition_spec, ) return self.parent._run_table_sql( sql, task_name="SQLChangePartitionSpecTask", hints=hints, wait=not async_ ) @utils.with_wait_argument def touch(self, partition_spec=None, async_=False, hints=None): """ Update the last modified time of the table or specified partition. :param partition_spec: partition spec, optional """ action = u"TOUCH " + self._build_partition_spec_sql(partition_spec) sql = self._build_alter_table_ddl(action.strip()) inst = self.parent._run_table_sql( sql, task_name="SQLTouchTask", hints=hints, wait=not async_ ) self._unload_if_async(async_=async_) return inst def _get_max_field_size(self): try: project_field_size = self.project.get_property( "odps.sql.cfile2.field.maxsize", None ) return int(project_field_size or 0) * 1024 except: return 0 def new_record(self, values=None): """ Generate a record of the table. :param values: the values of this records :type values: list :return: record :rtype: :class:`odps.models.Record` :Example: >>> table = odps.create_table('test_table', schema=TableSchema.from_lists(['name', 'id'], ['sring', 'string'])) >>> record = table.new_record() >>> record[0] = 'my_name' >>> record[1] = 'my_id' >>> record = table.new_record(['my_name', 'my_id']) .. seealso:: :class:`odps.models.Record` """ return Record( schema=self.table_schema, values=values, max_field_size=self._get_max_field_size(), ) def to_df(self): """ Create a PyODPS DataFrame from this table. :return: DataFrame object """ from ..df import DataFrame return DataFrame(self)