odps/models/tasks/merge.py (331 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import re
import time
import warnings
from collections import namedtuple
from ... import compat, serializers, utils
from ...config import options
from .core import Task, build_execute_method
_ARCHIVE_TABLE_REGEX = re.compile(
r"^alter\s+table\s+(?P<table>[^\s;]+)\s+(|partition\s*\((?P<partition>[^)]+)\s*\))\s*"
r"archive[\s;]*$",
re.I,
)
_FREEZE_COMMAND_REGEX = re.compile(
r"^alter\s+table\s+(?P<table>[^\s;]+)\s+(|partition\s*\((?P<partition>[^)]+)\s*\))\s*"
r"(?P<command>freeze|restore)[\s;]*$",
re.I,
)
_MERGE_SMALL_FILES_REGEX = re.compile(
r"^alter\s+table\s+(?P<table>[^\s;]+)\s+(|partition\s*\((?P<partition>[^)]+)\s*\))\s*"
r"(merge\s+smallfiles|compact\s+(?P<compact_type>[^\s;]+)(|\s+(?P<compact_opts>[^;]+)))[\s;]*$",
re.I,
)
_COMPACT_TYPES = {
"major": "major_compact",
"minor": "minor_compact",
}
_FREEZE_TYPES = {
"freeze": "backup",
"restore": "restore",
}
_MergeTaskTableProps = namedtuple(
"_MergeTaskTableProps", "table, schema, task_table_name"
)
class MergeTask(Task):
_root = "Merge"
table = serializers.XMLNodeField("TableName")
def __init__(self, name=None, **kwargs):
name_prefix = kwargs.pop("name_prefix", None) or "merge_task"
if name is None:
name = "{0}_{1}_{2}".format(
name_prefix, int(time.time()), random.randint(100000, 999999)
)
kwargs["name"] = name
super(MergeTask, self).__init__(**kwargs)
@staticmethod
def _extract_table_props(odps, table, partition=None, schema=None, project=None):
from ...core import ODPS
schema = schema or odps.schema
if not isinstance(table, compat.six.string_types):
if table.get_schema():
schema = table.get_schema().name
table_name = table.full_table_name
else:
table_name = table
table = odps.get_table(table, project=project, schema=schema)
_, schema, _ = odps._split_object_dots(table_name)
if partition:
table_name += " partition(%s)" % (ODPS._parse_partition_string(partition))
tb_name_parts = [utils.strip_backquotes(s) for s in table_name.split(".")]
return _MergeTaskTableProps(table, schema, ".".join(tb_name_parts))
@staticmethod
def _parse_compact_opts(force_mode, recent_hours, kwargs):
compact_opts = kwargs.pop("compact_opts", None)
if not compact_opts:
return force_mode, recent_hours
if force_mode is not None or recent_hours is not None:
raise TypeError(
"compact_opts and force_mode / recent_hours "
"can not be specified at the same time"
)
compact_opts_list = compact_opts.lower().split()
if "-f" in compact_opts_list:
force_mode = True
try:
hours_index = compact_opts_list.index("-h")
except ValueError:
hours_index = None
if hours_index is not None:
if (
hours_index + 1 >= len(compact_opts_list)
or not compact_opts_list[hours_index + 1].isnumeric()
):
raise ValueError("Need to specify hours after -h suffix")
recent_hours = int(compact_opts_list[hours_index + 1])
return force_mode, recent_hours
@classmethod
def _create_base_merge_task(
cls,
odps,
table,
partition=None,
project=None,
schema=None,
hints=None,
quota_name=None,
name_prefix=None,
):
props = cls._extract_table_props(
odps, table, partition=partition, schema=schema, project=project
)
hints = hints or dict()
if options.default_task_settings:
hints.update(options.default_task_settings)
if odps.is_schema_namespace_enabled(hints) or props.schema is not None:
hints.update(
{
"odps.sql.allow.namespace.schema": "true",
"odps.namespace.schema": "true",
}
)
if props.schema is not None:
hints["odps.default.schema"] = props.schema
if quota_name or odps.quota_name:
hints["odps.task.wlm.quota"] = quota_name or odps.quota_name
task = cls(table=props.task_table_name, name_prefix=name_prefix)
task.update_settings(hints)
return task, props
@classmethod
def _submit_merge_task(
cls,
odps,
task,
project=None,
priority=None,
running_cluster=None,
unique_identifier_id=None,
create_callback=None,
):
priority = priority if priority is not None else options.priority
if priority is None and options.get_priority is not None:
priority = options.get_priority(odps)
project = odps.get_project(name=project)
return project.instances.create(
task=task,
running_cluster=running_cluster,
priority=priority,
unique_identifier_id=unique_identifier_id,
create_callback=create_callback,
)
@classmethod
def run_merge_files(
cls,
odps,
table,
partition=None,
project=None,
schema=None,
hints=None,
priority=None,
running_cluster=None,
compact_type=None,
force_mode=None,
recent_hours=None,
quota_name=None,
unique_identifier_id=None,
create_callback=None,
**kwargs
):
"""
Start running a task to merge multiple files in tables.
:param table: name of the table to optimize
:param partition: partition to optimize
:param project: project name, if not provided, will be the default project
:param str schema: schema name, if not provided, will be the default schema
:param hints: settings for merge task.
:param priority: instance priority, 9 as default
:param running_cluster: cluster to run this instance
:param compact_type: compact option for transactional table, can be major or minor.
:param str unique_identifier_id: unique instance ID
:return: instance
:rtype: :class:`odps.models.Instance`
"""
force_mode, recent_hours = cls._parse_compact_opts(
force_mode, recent_hours, kwargs
)
if kwargs:
raise TypeError("Unsupported keyword arguments %s" % ", ".join(kwargs))
prefix = "merge_task" if compact_type is None else "compact_task"
task, props = cls._create_base_merge_task(
odps,
table,
partition=partition,
project=project,
schema=schema,
hints=hints,
name_prefix=prefix,
quota_name=quota_name,
)
hints = hints or dict()
compact_type = _COMPACT_TYPES.get(compact_type)
if compact_type:
hints.update(
{
"odps.merge.txn.table.compact": compact_type,
"odps.merge.restructure.action": "hardlink",
}
)
if compact_type == "minor_compact":
if (
recent_hours is not None
and recent_hours < props.table.acid_data_retain_hours
and not force_mode
):
warnings.warn(
"setting 'recentHoursThresholdForPartialCompact' below the data "
"retention period (%s hours) prevents past time travel. "
"It's now set to match the retention period. "
"Use -f to override." % props.table.acid_data_retain_hours
)
recent_hours = props.table.acid_data_retain_hours
recent_hours = recent_hours or -1
hints["odps.merge.txn.table.compact.txn.id"] = str(recent_hours)
task.update_settings(hints)
return cls._submit_merge_task(
odps,
task,
project=project,
priority=priority,
running_cluster=running_cluster,
unique_identifier_id=unique_identifier_id,
create_callback=create_callback,
)
execute_merge_files = build_execute_method(
run_merge_files,
"""
Execute a task to merge multiple files in tables and wait for termination.
""",
)
@classmethod
def run_archive_table(
cls,
odps,
table,
partition=None,
project=None,
schema=None,
hints=None,
priority=None,
running_cluster=None,
quota_name=None,
unique_identifier_id=None,
create_callback=None,
):
"""
Start running a task to archive tables.
:param table: name of the table to archive
:param partition: partition to archive
:param project: project name, if not provided, will be the default project
:param hints: settings for table archive task.
:param priority: instance priority, 9 as default
:param str unique_identifier_id: unique instance ID
:return: instance
:rtype: :class:`odps.models.Instance`
"""
task, props = cls._create_base_merge_task(
odps,
table,
partition=partition,
project=project,
schema=schema,
hints=hints,
name_prefix="archive_task",
quota_name=quota_name,
)
task._update_property_json("archiveSettings", {"odps.merge.archive.flag": True})
return cls._submit_merge_task(
odps,
task,
project=project,
priority=priority,
running_cluster=running_cluster,
unique_identifier_id=unique_identifier_id,
create_callback=create_callback,
)
execute_archive_table = build_execute_method(
run_archive_table,
"""
Execute a task to archive tables and wait for termination.
""",
)
@classmethod
def run_freeze_command(
cls,
odps,
table,
partition=None,
command=None,
project=None,
schema=None,
hints=None,
priority=None,
running_cluster=None,
quota_name=None,
unique_identifier_id=None,
create_callback=None,
):
"""
Start running a task to freeze or restore tables.
:param table: name of the table to archive
:param partition: partition to archive
:param command: freeze command to execute, can be freeze or restore
:param project: project name, if not provided, will be the default project
:param hints: settings for table archive task.
:param priority: instance priority, 9 as default
:param str unique_identifier_id: unique instance ID
:return: instance
:rtype: :class:`odps.models.Instance`
"""
task, props = cls._create_base_merge_task(
odps,
table,
partition=partition,
project=project,
schema=schema,
hints=hints,
quota_name=quota_name,
name_prefix=command.lower() + "_task",
)
hints = hints or dict()
hints["odps.merge.cold.storage.mode"] = _FREEZE_TYPES[command.lower()]
task.update_settings(hints)
return cls._submit_merge_task(
odps,
task,
project=project,
priority=priority,
running_cluster=running_cluster,
unique_identifier_id=unique_identifier_id,
create_callback=create_callback,
)
execute_freeze_command = build_execute_method(
run_freeze_command,
"""
Execute a task to archive tables and wait for termination.
""",
)
@classmethod
def submit_alter_table_instance(
cls,
odps,
sql,
project=None,
schema=None,
priority=None,
running_cluster=None,
hints=None,
unique_identifier_id=None,
quota_name=None,
create_callback=None,
):
command_to_call = [
(_ARCHIVE_TABLE_REGEX, cls.run_archive_table),
(_FREEZE_COMMAND_REGEX, cls.run_freeze_command),
(_MERGE_SMALL_FILES_REGEX, cls.run_merge_files),
]
for cmd_regex, run_cmd in command_to_call:
cmd_regex_match = cmd_regex.match(sql)
if cmd_regex_match:
kwargs = cmd_regex_match.groupdict().copy()
kwargs.update(
{
"project": project,
"schema": schema,
"hints": hints,
"running_cluster": running_cluster,
"priority": priority,
"quota_name": quota_name,
"unique_identifier_id": unique_identifier_id,
"create_callback": create_callback,
}
)
return run_cmd(odps, **kwargs)
return None