odps/models/partitions.py (190 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict, defaultdict
from .. import errors, serializers, types
from ..compat import six
from ..utils import with_wait_argument
from .core import Iterable
from .partition import Partition
class PartitionSpecCondition(object):
_predicates = OrderedDict(
[
("==", lambda a, b: a == b),
(">=", lambda a, b: a >= b),
("<=", lambda a, b: a <= b),
("<>", lambda a, b: a != b),
("!=", lambda a, b: a != b),
(">", lambda a, b: a > b),
("<", lambda a, b: a < b),
("=", lambda a, b: a == b),
]
)
def __init__(self, part_fields, condition=None):
self._part_to_conditions = defaultdict(list)
field_set = set(part_fields)
condition = str(condition) if condition else None
condition_splits = condition.split(",") if condition else []
for split in condition_splits:
for pred in self._predicates:
if pred not in split:
continue
parts = split.split(pred, 1)
if len(parts) != 2:
raise ValueError("Invalid partition condition %r" % split)
part = parts[0].strip()
val = parts[1].strip().replace('"', "").replace("'", "")
if part not in field_set:
raise ValueError("Invalid partition field %r" % part)
self._part_to_conditions[part].append((pred, val))
break
else:
raise ValueError("Invalid partition condition %r" % split)
specs = []
for field in part_fields:
if (
field not in self._part_to_conditions
or len(self._part_to_conditions[field]) > 1
or self._part_to_conditions[field][0][0] not in ("=", "==")
):
break
specs.append("%s=%s" % (field, self._part_to_conditions.pop(field)[0][1]))
self.partition_spec = types.PartitionSpec(",".join(specs)) if specs else None
def match(self, spec):
for field, conditions in self._part_to_conditions.items():
real_val = spec[field]
for pred, val in conditions:
if not self._predicates[pred](real_val, val):
return False
return True
class Partitions(Iterable):
marker = serializers.XMLNodeField("Marker")
max_items = serializers.XMLNodeField("MaxItems")
partitions = serializers.XMLNodesReferencesField(Partition, "Partition")
def _name(self):
return
def _get(self, item):
return Partition(client=self._client, parent=self, spec=item)
def __getitem__(self, item):
if isinstance(item, six.string_types):
item = types.PartitionSpec(item)
return self._get(item)
elif isinstance(item, types.PartitionSpec):
return self._get(item)
return super(Partitions, self).__getitem__(item)
def __contains__(self, item):
if isinstance(item, (six.string_types, types.PartitionSpec)):
if isinstance(item, six.string_types):
item = types.PartitionSpec(item)
partition = self._get(item)
elif isinstance(item, Partition):
partition = item
else:
return False
try:
partition.reload()
return True
except errors.NoSuchObject:
return False
@classmethod
def _get_partition_spec(cls, partition_spec):
if isinstance(partition_spec, types.PartitionSpec):
return partition_spec
return types.PartitionSpec(partition_spec)
def __iter__(self):
return self.iterate_partitions()
@property
def project(self):
return self.parent.project
def iterate_partitions(self, spec=None, reverse=False):
condition = PartitionSpecCondition(
[pt.name for pt in self.parent.table_schema.partitions], spec
)
spec = condition.partition_spec
actions = ["partitions"]
params = {"expectmarker": "true"}
if reverse:
actions.append("reverse")
if spec is not None and not spec.is_empty:
params["partition"] = str(spec)
schema_name = self._get_schema_name()
if schema_name:
params["curr_schema"] = schema_name
def _it():
last_marker = params.get("marker")
if "marker" in params and (last_marker is None or len(last_marker) == 0):
return
url = self.resource()
resp = self._client.get(url, actions=actions, params=params)
t = self.parse(self._client, resp, obj=self)
params["marker"] = t.marker
return t.partitions
while True:
partitions = _it()
if partitions is None:
break
for partition in partitions:
if condition.match(partition.partition_spec):
yield partition
def get_max_partition(self, spec=None, skip_empty=True, reverse=False):
table_parts = self.parent.table_schema.partitions
if spec is not None:
spec = self._get_partition_spec(spec)
if len(spec) >= len(table_parts):
raise ValueError(
"Size of prefix should not exceed number of partitions of the table"
)
for exist_pt, user_pt_name in zip(table_parts, spec.kv):
if exist_pt.name != user_pt_name:
table_pt_str = ",".join(pt.name for pt in table_parts[: len(spec)])
prefix_pt_str = ",".join(spec.kv.keys())
raise ValueError(
"Partition prefix %s not agree with table partitions %s",
prefix_pt_str,
table_pt_str,
)
part_values = [
(part, tuple(part.partition_spec.values()))
for part in self.iterate_partitions(spec)
]
if not part_values:
return None
elif not skip_empty:
return max(part_values, key=lambda tp: tp[1])[0]
else:
reversed_table_parts = sorted(
part_values, key=lambda tp: tp[1], reverse=not reverse
)
return next(
(
part
for part, _ in reversed_table_parts
if not skip_empty or part.physical_size > 0
),
None,
)
@with_wait_argument
def create(self, partition_spec, if_not_exists=False, async_=False, hints=None):
if isinstance(partition_spec, Partition):
partition_spec = partition_spec.partition_spec
else:
partition_spec = self._get_partition_spec(partition_spec)
part_sql = self.parent._build_partition_spec_sql(partition_spec)
action = "ADD%s %s" % (" IF NOT EXISTS" if if_not_exists else "", part_sql)
sql = self.parent._build_alter_table_ddl(action)
instance = self.parent.parent._run_table_sql(
sql, "SQLAddPartitionTask", hints=hints, wait=not async_
)
if not async_:
return self[partition_spec]
else:
return instance
@with_wait_argument
def delete(self, partition_spec, if_exists=False, async_=False, hints=None):
if isinstance(partition_spec, Partition):
partition_spec = partition_spec.partition_spec
else:
partition_spec = self._get_partition_spec(partition_spec)
part_sql = self.parent._build_partition_spec_sql(partition_spec)
action = "DROP%s %s" % (" IF EXISTS" if if_exists else "", part_sql)
sql = self.parent._build_alter_table_ddl(action)
return self.parent.parent._run_table_sql(
sql, "SQLDropPartitionTask", hints=hints, wait=not async_
)