odps/models/partition.py (273 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import warnings from datetime import datetime from .. import serializers, types, utils from .core import JSONRemoteModel, LazyLoad, XMLRemoteModel from .storage_tier import StorageTierInfo class Partition(LazyLoad): """ A partition is a collection of rows in a table whose partition columns are equal to specific values. In order to write data into partition, users should call the ``open_writer`` method with **with statement**. At the same time, the ``open_reader`` method is used to provide the ability to read records from a partition. The behavior of these methods are the same as those in Table class except that there are no 'partition' params. """ _extended_args = ( "is_archived", "is_exstore", "lifecycle", "physical_size", "file_num", "reserved", "cdc_size", "cdc_record_num", ) __slots__ = ( "spec", "creation_time", "last_meta_modified_time", "last_data_modified_time", "last_access_time", "size", "record_num", "_is_extend_info_loaded", ) __slots__ += _extended_args _extended_args = set(_extended_args) class Column(XMLRemoteModel): name = serializers.XMLNodeAttributeField(attr="Name") value = serializers.XMLNodeAttributeField(attr="Value") class PartitionMeta(JSONRemoteModel): creation_time = serializers.JSONNodeField( "createTime", parse_callback=datetime.fromtimestamp, set_to_parent=True ) last_meta_modified_time = serializers.JSONNodeField( "lastDDLTime", parse_callback=datetime.fromtimestamp, set_to_parent=True ) last_data_modified_time = serializers.JSONNodeField( "lastModifiedTime", parse_callback=datetime.fromtimestamp, set_to_parent=True, ) last_access_time = serializers.JSONNodeField( "lastAccessTime", parse_callback=lambda x: datetime.fromtimestamp(x) if x else None, set_to_parent=True, ) size = serializers.JSONNodeField( "partitionSize", parse_callback=int, set_to_parent=True ) record_num = serializers.JSONNodeField( "partitionRecordNum", parse_callback=int, set_to_parent=True ) class PartitionExtendedMeta(PartitionMeta): is_archived = serializers.JSONNodeField( "IsArchived", parse_callback=bool, set_to_parent=True ) is_exstore = serializers.JSONNodeField( "IsExstore", parse_callback=bool, set_to_parent=True ) lifecycle = serializers.JSONNodeField( "LifeCycle", parse_callback=int, set_to_parent=True ) physical_size = serializers.JSONNodeField( "PhysicalSize", parse_callback=int, set_to_parent=True ) file_num = serializers.JSONNodeField( "FileNum", parse_callback=int, set_to_parent=True ) reserved = serializers.JSONNodeField( "Reserved", type="json", set_to_parent=True ) columns = serializers.XMLNodesReferencesField(Column, "Column") _schema = serializers.XMLNodeReferenceField(PartitionMeta, "Schema") _extended_schema = serializers.XMLNodeReferenceField( PartitionExtendedMeta, "Schema" ) creation_time = serializers.XMLNodeField( "CreationTime", parse_callback=lambda x: datetime.fromtimestamp(int(x)) ) last_meta_modified_time = serializers.XMLNodeField( "LastDDLTime", parse_callback=lambda x: datetime.fromtimestamp(int(x)) ) last_data_modified_time = serializers.XMLNodeField( "LastModifiedTime", parse_callback=lambda x: datetime.fromtimestamp(int(x)) ) size = serializers.XMLNodeField("PartitionSize", parse_callback=int) record_num = serializers.XMLNodeField("PartitionRecordCount", parse_callback=int) def __init__(self, **kwargs): self._is_extend_info_loaded = False super(Partition, self).__init__(**kwargs) def __str__(self): return str(self.partition_spec) def __repr__(self): return "<Partition %s.`%s`(%s)>" % ( str(self.table.project.name), str(self.table.name), str(self.partition_spec), ) def _is_field_set(self, attr): try: attr_val = self._getattr(attr) except AttributeError: return False if attr in ("size", "record_num") and attr_val is not None and attr_val >= 0: return True return attr_val is not None def __getattribute__(self, attr): if attr in type(self)._extended_args: if not self._is_extend_info_loaded and not self._is_field_set(attr): self.reload_extend_info() return object.__getattribute__(self, attr) val = object.__getattribute__(self, attr) if val is None and not self._loaded: if attr in getattr(Partition.PartitionMeta, "__fields"): self.reload() return object.__getattribute__(self, attr) return super(Partition, self).__getattribute__(attr) def _set_state(self, name, parent, client): self.__init__(spec=name, _parent=parent, _client=client) def _name(self): return @classmethod def get_partition_spec(cls, columns=None, spec=None): if spec is not None: return spec spec = types.PartitionSpec() for col in columns: spec[col.name] = col.value return spec @property def last_modified_time(self): warnings.warn( "Partition.last_modified_time is deprecated and will be replaced by " "Partition.last_data_modified_time.", DeprecationWarning, stacklevel=3, ) utils.add_survey_call( ".".join([type(self).__module__, type(self).__name__, "last_modified_time"]) ) return self.last_data_modified_time @property def partition_spec(self): return self.get_partition_spec(self._getattr("columns"), self._getattr("spec")) @property def name(self): return str(self.partition_spec) @property def table(self): return self.parent.parent @property def project(self): return self.table.project @property def storage_tier_info(self): return StorageTierInfo.deserial(self.reserved) def reload(self): url = self.resource() params = {"partition": str(self.partition_spec)} resp = self._client.get(url, params=params, curr_schema=self._get_schema_name()) self.parse(self._client, resp, obj=self) self._loaded = True def reload_extend_info(self): url = self.resource() params = {"partition": str(self.partition_spec)} resp = self._client.get( url, action="extended", params=params, curr_schema=self._get_schema_name() ) self.parse(self._client, resp, obj=self) self._is_extend_info_loaded = True self._parse_reserved() def _parse_reserved(self): if not self.reserved: self.cdc_size = -1 self.cdc_record_num = -1 return self.cdc_size = int(self.reserved.get("cdc_size", "-1")) self.cdc_record_num = int(self.reserved.get("cdc_record_num", "-1")) def head(self, limit, columns=None): """ Get the head records of a partition :param limit: records' size, 10000 at most :param list columns: the columns which is subset of the table columns :return: records :rtype: list .. seealso:: :class:`odps.models.Record` """ return self.table.head(limit, partition=self.partition_spec, columns=columns) def to_df(self): """ Create a PyODPS DataFrame from this partition. :return: DataFrame object """ from ..df import DataFrame return DataFrame(self.table).filter_parts(self) @utils.with_wait_argument def drop(self, async_=False, if_exists=False): """ Drop this partition. :param async_: run asynchronously if True :param if_exists: :return: None """ return self.parent.delete(self, if_exists=if_exists, async_=async_) def open_reader(self, **kw): """ Open the reader to read the entire records from this partition. :param reopen: the reader will reuse last one, reopen is true means open a new reader. :type reopen: bool :param endpoint: the tunnel service URL :param compress_option: compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :param compress_algo: compression algorithm, work when ``compress_option`` is not provided, can be ``zlib``, ``snappy`` :param compress_level: used for ``zlib``, work when ``compress_option`` is not provided :param compress_strategy: used for ``zlib``, work when ``compress_option`` is not provided :return: reader, ``count`` means the full size, ``status`` means the tunnel status :Example: >>> with partition.open_reader() as reader: >>> count = reader.count # How many records of a partition >>> for record in reader[0: count]: >>> # read all data, actually better to split into reading for many times """ return self.table.open_reader(str(self), **kw) def open_writer(self, blocks=None, **kw): return self.table.open_writer(self.partition_spec, blocks=blocks, **kw) def to_pandas( self, columns=None, start=None, count=None, n_process=1, quota_name=None, append_partitions=None, tags=None, **kwargs ): """ Read partition data into pandas DataFrame :param list columns: columns to read :param int start: start row index from 0 :param int count: data count to read :param int n_process: number of processes to accelerate reading :param str quota_name: name of tunnel quota to use :param bool append_partitions: if True, partition values will be appended to the output """ return self.table.to_pandas( partition=self.partition_spec, columns=columns, arrow=True, quota_name=quota_name, tags=tags, n_process=n_process, start=start, count=count, append_partitions=append_partitions, **kwargs ) def iter_pandas( self, columns=None, batch_size=None, start=None, count=None, quota_name=None, append_partitions=None, tags=None, **kwargs ): """ Read partition data into pandas DataFrame :param list columns: columns to read :param int batch_size: size of DataFrame batch to read :param int start: start row index from 0 :param int count: data count to read :param str quota_name: name of tunnel quota to use :param bool append_partitions: if True, partition values will be appended to the output """ for batch in self.table.iter_pandas( partition=self.partition_spec, columns=columns, batch_size=batch_size, arrow=True, quota_name=quota_name, tags=tags, append_partitions=append_partitions, start=start, count=count, **kwargs ): yield batch @utils.with_wait_argument def truncate(self, async_=False): """ Truncate current partition. """ return self.table.truncate(self.partition_spec, async_=async_) def _unload_if_async(self, async_=False, reload=True): self._is_extend_info_loaded = False if async_: self._loaded = False elif reload: self.reload() @utils.with_wait_argument def set_storage_tier(self, storage_tier, async_=False, hints=None): """ Set storage tier of current partition. """ inst = self.table.set_storage_tier( storage_tier, partition_spec=self.partition_spec, async_=async_, hints=hints ) self._unload_if_async(async_) return inst @utils.with_wait_argument def change_partition_spec(self, new_partition_spec, async_=False, hints=None): """ Change partition spec of current partition. :param new_partition_spec: new partition spec """ inst = self.table.change_partition_spec( self.partition_spec, new_partition_spec, async_=async_, hints=hints, ) self.spec = types.PartitionSpec(new_partition_spec) self._unload_if_async(async_) return inst @utils.with_wait_argument def touch(self, async_=False, hints=None): """ Update the last modified time of the partition. """ inst = self.table.touch( partition_spec=self.partition_spec, async_=async_, hints=hints ) self._unload_if_async(async_) return inst