core/lib/sqlparse/diff.py (714 lines of code) (raw):
"""
Copyright (c) 2017-present, Facebook, Inc.
All rights reserved.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import copy
import logging
from enum import Enum
from osc.lib.sqlparse import CreateParser
from .models import (
EnumColumn,
escape,
is_equal,
PartitionConfig,
SetColumn,
TimestampColumn,
)
log: logging.Logger = logging.getLogger(__name__)
class BaseAlterType(Enum):
pass
# Side note the algorithm that could be chosen for DDL. See https://fburl.com/bxftsout
class ColAlterType(BaseAlterType):
CHANGE_COL_DEFAULT_VAL = "change_col_default_val" # instant
REORDER_COL = "reorder_col" # rebuild
ADD_COL = "add_col" # instant
ADD_AUTO_INC_COL = "add_auto_inc_col" # inplace
DROP_COL = "drop_col" # rebuild
CHANGE_COL_DATA_TYPE = "change_col_data_type" # copy
CHANGE_NULL = "change_null" # rebuild
CHANGE_ENUM = "change_enum" # instant/copy
CHANGE_SET = "change_set" # instant/copy
CHANGE_COL_CHARSET = "change_col_charset"
CHANGE_COL_COLLATE = "change_col_collate"
CHANGE_COL_COMMENT = "change_col_comment"
class IndexAlterType(BaseAlterType):
CHANGE_INDEX_TYPE = "change_index_type" # instant. index type is hash/btree
CHANGE_UNIQUE_CONSTRAINT = "change_unique_constraint"
CHANGE_INDEX_KEY_BLOCK_SIZE = "change_index_key_block_size"
CHANGE_KEY_TYPE = "change_key_type" # key type is FULLTEXT/SPATIAL
CHANGE_INDEX_COMMENT = "change_index_comment"
ADD_INDEX = "add_index" # inplace
DROP_INDEX = "drop_index" # inplace, metadata only
CHANGE_PK = "change_pk" # copy
BECOME_UNIQUE_INDEX = "become_unique_index"
class TableAlterType(BaseAlterType):
CHANGE_ROW_FORMAT = "change_row_format"
CHANGE_TABLE_KEY_BLOCK_SIZE = "change_table_key_block_size"
CHANGE_TABLE_CHARSET = "change_table_charset"
CHANGE_TABLE_COLLATE = "change_table_collate"
CHANGE_TABLE_COMMENT = "change_table_comment"
CHANGE_ENGINE = "change_engine"
CHANGE_AUTO_INC_VAL = "change_auto_inc_val" # inplace
class PartitionAlterType(BaseAlterType):
ADD_PARTITION = "add_partition"
DROP_PARTITION = "drop_partition"
REORGANIZE_PARTITION = "reorganize_partition"
ADD_PARTITIONING = "add_partitioning"
REMOVE_PARTITIONING = "remove_partitioning"
CHANGE_PARTITIONING = "change_partitioning"
INVALID_PARTITIONING = "invalid_partitioning"
class NewMysql80FeatureAlterType(BaseAlterType):
JSON = "json"
DESC_INDEX = "desc_index"
INSTANT_DDLS = {
ColAlterType.CHANGE_COL_DEFAULT_VAL,
ColAlterType.ADD_COL,
IndexAlterType.CHANGE_INDEX_TYPE,
}
class TableOptionDiff(object):
def __init__(self, option_name, value):
self.option_name = option_name
self.value = value
def to_sql(self):
return "{}={}".format(self.option_name, self.value)
class SchemaDiff(object):
"""
Representing the difference between two Table object
"""
def __init__(self, left, right, ignore_partition=False):
self.left = left
self.right = right
self.attrs_to_check = [
"charset",
"collate",
"comment",
"engine",
"key_block_size",
"name",
"row_format",
]
self.ignore_partition = ignore_partition
if not ignore_partition:
self.attrs_to_check.append("partition")
self._alter_types = set()
def get_dropped_cols(self):
new_column_names = [col.name for col in self.right.column_list]
return [
col for col in self.left.column_list if col.name not in new_column_names
]
def get_added_cols(self):
old_column_names = [col.name for col in self.left.column_list]
return [
col for col in self.right.column_list if col.name not in old_column_names
]
def _calculate_diff(self):
diffs = {
"removed": [],
"added": [],
# Customized messages
"msgs": [],
# Any attributes that were modified
"attrs_modified": [],
}
# We are copying here since we want to change the col list.
# Shallow copy should be enough here
col_left_copy = copy.copy(self.left.column_list)
col_right_copy = copy.copy(self.right.column_list)
col_left_dict = {col.name: col for col in col_left_copy}
col_right_dict = {col.name: col for col in col_right_copy}
for col in self.get_dropped_cols():
diffs["removed"].append(col)
col_left_copy.remove(col)
for col in self.get_added_cols():
diffs["added"].append(col)
col_right_copy.remove(col)
column_changes = []
for col in set(col_left_dict.keys()) & set(col_right_dict.keys()):
if col_left_dict[col] != col_right_dict[col]:
column_changes.append(
f"Previous column: {col_left_dict[col]}, "
f"desired column: {col_right_dict[col]}"
)
if column_changes:
diffs["msgs"].append("Column attrs changes detected:")
diffs["msgs"].append("\n".join(column_changes))
# Two tables have different col order
if sorted(col_left_copy, key=lambda col: col.name) == sorted(
col_right_copy, key=lambda col: col.name
):
old_order = []
new_order = []
for col1, col2 in zip(col_left_copy, col_right_copy):
if col1 != col2:
old_order.append(col1.name)
new_order.append(col2.name)
if old_order:
diffs["msgs"].append("Column order mismatch was detected:")
diffs["msgs"].append("- " + ", ".join(old_order))
diffs["msgs"].append("+ " + ", ".join(new_order))
for idx in self.left.indexes:
if idx not in self.right.indexes:
diffs["removed"].append(idx)
for idx in self.right.indexes:
if idx not in self.left.indexes:
diffs["added"].append(idx)
if self.left.primary_key != self.right.primary_key:
if self.left.primary_key.column_list:
diffs["removed"].append(self.left.primary_key)
if self.right.primary_key.column_list:
diffs["added"].append(self.right.primary_key)
for attr in self.attrs_to_check:
tbl_option_old = getattr(self.left, attr)
tbl_option_new = getattr(self.right, attr)
if not is_equal(tbl_option_old, tbl_option_new):
diffs["removed"].append(TableOptionDiff(attr, tbl_option_old))
diffs["added"].append(TableOptionDiff(attr, tbl_option_new))
diffs["attrs_modified"].append(attr)
return diffs
def __str__(self):
if self.left == self.right:
return "No difference"
else:
diff_strs = []
diffs = self._calculate_diff()
for diff in diffs["removed"]:
diff_strs.append("- " + diff.to_sql())
for diff in diffs["added"]:
diff_strs.append("+ " + diff.to_sql())
for diff in diffs["msgs"]:
diff_strs.append(diff)
for attr in diffs["attrs_modified"]:
diff_strs.append(f"attrs_modified: {attr}")
diff_str = "\n".join(diff_strs)
return diff_str
def diffs(self):
return self._calculate_diff()
@property
def alter_types(self):
if not self._alter_types:
self.to_sql()
return self._alter_types
def add_alter_type(self, ddl_alter_type):
self._alter_types.add(ddl_alter_type)
def _gen_col_sql(self):
"""
Generate the column section for ALTER TABLE statement
"""
segments = []
old_columns = {col.name: col for col in self.left.column_list}
new_columns = {col.name: col for col in self.right.column_list}
old_column_names = [col.name for col in self.left.column_list]
new_column_names = [col.name for col in self.right.column_list]
# Drop columns
for col in self.left.column_list:
if col.name not in new_columns.keys():
segments.append("DROP `{}`".format(escape(col.name)))
old_column_names.remove(col.name)
self.add_alter_type(ColAlterType.DROP_COL)
# Add columns
# If the added column is not at the end, recognize that as reordering columns
handled_cols = []
for idx, col in enumerate(self.right.column_list):
if col.name not in old_columns.keys():
if idx == 0:
position = "FIRST"
if (
old_column_names
and ColAlterType.DROP_COL not in self._alter_types
):
self.add_alter_type(ColAlterType.REORDER_COL)
old_column_names = [col.name] + old_column_names
else:
position = "AFTER `{}`".format(
escape(self.right.column_list[idx - 1].name)
)
new_idx = (
old_column_names.index(self.right.column_list[idx - 1].name) + 1
)
if (
new_idx != len(old_column_names)
and ColAlterType.DROP_COL not in self._alter_types
):
self.add_alter_type(ColAlterType.REORDER_COL)
old_column_names = (
old_column_names[:new_idx]
+ [col.name]
+ old_column_names[new_idx:]
)
handled_cols.append(col.name)
self.add_alter_type(ColAlterType.ADD_COL)
if col.column_type == "JSON":
self.add_alter_type(NewMysql80FeatureAlterType.JSON)
if col.auto_increment:
self.add_alter_type(ColAlterType.ADD_AUTO_INC_COL)
segments.append("ADD {} {}".format(col.to_sql(), position))
# Adjust position
# The idea here is to compare column ancestor if they are the same between
# old and new column list, this means the position of this particular
# column hasn't been changed. Otherwise add a MODIFY clause to change the
# position
for idx, col_name in enumerate(new_column_names):
# If the column is recently added, then skip because it's already
# in the DDL
if col_name in handled_cols:
continue
# Get column definition
col = new_columns[col_name]
old_pos = old_column_names.index(col_name)
# If the first column is diferent, we need to adjust the sequence
if idx == 0:
if old_pos == 0:
continue
segments.append("MODIFY {} FIRST".format(col.to_sql()))
handled_cols.append(col_name)
self.add_alter_type(ColAlterType.REORDER_COL)
continue
# If this column has the same ancestor then it means there's no sequence
# adjustment needed
if new_column_names[idx - 1] == old_column_names[old_pos - 1]:
continue
segments.append(
"MODIFY {} AFTER `{}`".format(
col.to_sql(), escape(new_column_names[idx - 1])
)
)
handled_cols.append(col_name)
self.add_alter_type(ColAlterType.REORDER_COL)
# Modify columns
for col in self.right.column_list:
if col.name in old_columns and col != old_columns[col.name]:
# If the column has been taken care of because of sequence change
# previously we can skip the work here
if col.name in handled_cols:
continue
self._update_col_attrs_changes(col, old_columns[col.name])
segments.append("MODIFY {}".format(col.to_sql()))
return segments
def _is_null_change(self, old_col, new_col):
if isinstance(old_col, TimestampColumn):
old_col.explicit_ts_default()
if isinstance(new_col, TimestampColumn):
new_col.explicit_ts_default()
return old_col.nullable != new_col.nullable
def _is_col_default_change(self, old_col, new_col):
if isinstance(old_col, TimestampColumn):
old_col.explicit_ts_default()
if isinstance(new_col, TimestampColumn):
new_col.explicit_ts_default()
return not old_col.has_same_default(new_col)
def _update_col_attrs_changes(self, new_col, old_col):
if (
new_col.column_type != old_col.column_type
or new_col.length != old_col.length
):
self.add_alter_type(ColAlterType.CHANGE_COL_DATA_TYPE)
if (
self._is_col_default_change(old_col, new_col)
and ColAlterType.CHANGE_COL_DATA_TYPE not in self._alter_types
):
self.add_alter_type(ColAlterType.CHANGE_COL_DEFAULT_VAL)
if (
self._is_null_change(old_col, new_col)
and ColAlterType.CHANGE_COL_DATA_TYPE not in self._alter_types
):
self.add_alter_type(ColAlterType.CHANGE_NULL)
if (
isinstance(new_col, EnumColumn)
and isinstance(old_col, EnumColumn)
and new_col.enum_list != old_col.enum_list
):
self.add_alter_type(ColAlterType.CHANGE_ENUM)
if (
isinstance(new_col, SetColumn)
and isinstance(old_col, SetColumn)
and new_col.set_list != old_col.set_list
):
self.add_alter_type(ColAlterType.CHANGE_SET)
if new_col.charset != old_col.charset:
self.add_alter_type(ColAlterType.CHANGE_COL_CHARSET)
if new_col.collate != old_col.collate:
self.add_alter_type(ColAlterType.CHANGE_COL_COLLATE)
if new_col.comment != old_col.comment:
self.add_alter_type(ColAlterType.CHANGE_COL_COMMENT)
def _gen_idx_sql(self):
"""
Generate the index section for ALTER TABLE statement
"""
segments = []
# Drop index
for idx in self.left.indexes:
if idx not in self.right.indexes:
segments.append("DROP KEY `{}`".format(escape(idx.name)))
self.add_alter_type(IndexAlterType.DROP_INDEX)
# Add index
for idx in self.right.indexes:
if idx not in self.left.indexes:
segments.append("ADD {}".format(idx.to_sql()))
self.add_alter_type(IndexAlterType.ADD_INDEX)
self._update_index_attrs_changes(idx.name)
self._update_desc_index_type(idx.column_list)
if self.left.primary_key and not self.right.primary_key:
segments.append("DROP PRIMARY KEY")
self.add_alter_type(IndexAlterType.CHANGE_PK)
elif (
not self.left.primary_key.column_list and self.right.primary_key.column_list
):
segments.append("ADD {}".format(self.right.primary_key.to_sql()))
self.add_alter_type(IndexAlterType.CHANGE_PK)
self._update_desc_index_type(self.right.primary_key.column_list)
elif self.left.primary_key != self.right.primary_key:
segments.append("DROP PRIMARY KEY")
segments.append("ADD {}".format(self.right.primary_key.to_sql()))
self.add_alter_type(IndexAlterType.CHANGE_PK)
self._update_desc_index_type(self.right.primary_key.column_list)
return segments
def _update_desc_index_type(self, cols) -> None:
for col in cols:
if col.order == "DESC":
self.add_alter_type(NewMysql80FeatureAlterType.DESC_INDEX)
def _update_index_attrs_changes(self, idx_name):
old_indexes = {idx.name: idx for idx in self.left.indexes}
new_indexes = {idx.name: idx for idx in self.right.indexes}
if (
idx_name not in old_indexes
and idx_name in new_indexes
and new_indexes[idx_name].is_unique
):
self.add_alter_type(IndexAlterType.BECOME_UNIQUE_INDEX)
if not (idx_name in old_indexes and idx_name in new_indexes):
return
attrs = ["key_block_size", "comment", "is_unique", "key_type", "using"]
for attr in attrs:
if not is_equal(
getattr(old_indexes[idx_name], attr),
getattr(new_indexes[idx_name], attr),
):
if attr == "key_block_size":
self.add_alter_type(IndexAlterType.CHANGE_INDEX_KEY_BLOCK_SIZE)
elif attr == "comment":
self.add_alter_type(IndexAlterType.CHANGE_INDEX_COMMENT)
elif attr == "is_unique":
if (
new_indexes[idx_name].is_unique
and not old_indexes[idx_name].is_unique
):
self.add_alter_type(IndexAlterType.BECOME_UNIQUE_INDEX)
self.add_alter_type(IndexAlterType.CHANGE_UNIQUE_CONSTRAINT)
elif attr == "key_type":
self.add_alter_type(IndexAlterType.CHANGE_KEY_TYPE)
elif attr == "using":
self.add_alter_type(IndexAlterType.CHANGE_INDEX_TYPE)
def _gen_tbl_attr_sql(self):
"""
Generate the table attribute section for ALTER TABLE statement
"""
segments = []
for attr in self.attrs_to_check:
tbl_option_old = getattr(self.left, attr)
tbl_option_new = getattr(self.right, attr)
if not is_equal(tbl_option_old, tbl_option_new):
# when tbl_option_new is None, do "alter table xxx attr=None" won't work
if attr == "comment" and tbl_option_new is None:
segments.append("{}={}".format(attr, "''"))
elif attr == "row_format" and tbl_option_new is None:
segments.append("{}={}".format(attr, "default"))
elif attr == "partition" and self.ignore_partition is False:
self.generate_table_partition_operations(tbl_option_new, segments)
else:
segments.append("{}={}".format(attr, tbl_option_new))
# populate alter types data
if attr == "row_format":
self.add_alter_type(TableAlterType.CHANGE_ROW_FORMAT)
elif attr == "key_block_size":
self.add_alter_type(TableAlterType.CHANGE_TABLE_KEY_BLOCK_SIZE)
elif attr == "charset":
self.add_alter_type(TableAlterType.CHANGE_TABLE_CHARSET)
elif attr == "collate":
self.add_alter_type(TableAlterType.CHANGE_TABLE_COLLATE)
elif attr == "comment":
self.add_alter_type(TableAlterType.CHANGE_TABLE_COMMENT)
elif attr == "engine":
self.add_alter_type(TableAlterType.CHANGE_ENGINE)
# we don't want to alter auto_increment value in db, just record the alter type
if not is_equal(self.left.auto_increment, self.right.auto_increment):
self.add_alter_type(TableAlterType.CHANGE_AUTO_INC_VAL)
return segments
def generate_table_partition_operations(
self, partition_attr_value, segments
) -> None:
"""
Check difference between partition configurations
"""
try:
old_tbl_parts = PartitionConfig()
new_tbl_parts = PartitionConfig()
if self.left.partition:
old_tbl_parts = CreateParser.partition_to_model(
CreateParser.parse_partitions(self.left.partition)
)
if self.right.partition:
new_tbl_parts = CreateParser.partition_to_model(
CreateParser.parse_partitions(self.right.partition)
)
# Check for partitioning type change
# The case of partition to no partition is
# not handled here.
if (
partition_attr_value
and old_tbl_parts.get_type() != new_tbl_parts.get_type()
):
if not old_tbl_parts.get_type():
self.add_alter_type(PartitionAlterType.ADD_PARTITIONING)
else:
self.add_alter_type(PartitionAlterType.CHANGE_PARTITIONING)
segments.append(
"{}".format(partition_attr_value.lstrip(" ").rstrip(" "))
)
return
pdef_type = old_tbl_parts.get_type()
if (
pdef_type == PartitionConfig.PTYPE_HASH
or pdef_type == PartitionConfig.PTYPE_KEY
):
self.populate_alter_substatement_for_add_or_drop_table_for_hash_or_key(
old_tbl_parts.num_partitions,
new_tbl_parts.num_partitions,
partition_attr_value,
segments,
)
return
parts_to_drop = []
parts_to_add = []
[
is_valid_alter,
is_reorganization,
] = self.could_generate_sql_substatement_for_partition_reorganization(
old_tbl_parts,
new_tbl_parts,
pdef_type,
parts_to_drop,
parts_to_add,
segments,
)
if not is_valid_alter:
self.add_alter_type(PartitionAlterType.INVALID_PARTITIONING)
log.warning("job failed - alter partition operation sanity check")
return
if not is_reorganization:
self.populate_alter_substatement_for_add_or_drop_table_for_range_or_list(
parts_to_drop, parts_to_add, partition_attr_value, segments
)
except Exception:
self.add_alter_type(PartitionAlterType.INVALID_PARTITIONING)
log.exception("Unable to sync new table with orig table")
def could_generate_sql_substatement_for_partition_reorganization(
self,
old_tbl_parts,
new_tbl_parts,
pdef_type,
parts_to_drop,
parts_to_add,
segments,
) -> [bool, bool]:
old_pname_to_pdefs_dict = {}
old_pvalues_to_pdefs_dict = {}
old_pnames = []
for pdef in old_tbl_parts.part_defs:
old_pnames.append(pdef.pdef_name)
old_pname_to_pdefs_dict[pdef.pdef_name] = pdef
value_list = self.get_value_list(pdef.pdef_value_list)
old_pvalues_to_pdefs_dict[value_list] = pdef
new_pname_to_pdefs_dict = {}
new_pvalues_to_pdefs_dict = {}
new_pnames = []
for pdef in new_tbl_parts.part_defs:
new_pnames.append(pdef.pdef_name)
new_pname_to_pdefs_dict[pdef.pdef_name] = pdef
value_list = self.get_value_list(pdef.pdef_value_list)
new_pvalues_to_pdefs_dict[value_list] = pdef
old_tbl_part_defs = old_tbl_parts.part_defs
new_tbl_part_defs = new_tbl_parts.part_defs
for k, v in old_pvalues_to_pdefs_dict.items():
if (
k not in new_pvalues_to_pdefs_dict.keys()
or v.pdef_name != new_pvalues_to_pdefs_dict[k].pdef_name
):
parts_to_drop.append(v)
max_value_added = False
# Ensuring "MAXVALUE" to be the last of the list, if present since sorting can alter the order
for k, v in sorted(new_pvalues_to_pdefs_dict.items(), key=lambda item: item[0]):
if (
k not in old_pvalues_to_pdefs_dict.keys()
or v.pdef_name != old_pvalues_to_pdefs_dict[k].pdef_name
):
if k != "MAXVALUE":
parts_to_add.append(v)
else:
max_value_added = True
max_value_entry = v
if max_value_added:
parts_to_add.append(max_value_entry)
if parts_to_drop and parts_to_add:
# check for partition reorganization since we don't allow add/drop
# in one alter statement
return self.got_reorganize_subsql_after_add_drop_partition_detection(
pdef_type, old_tbl_part_defs, new_tbl_part_defs, segments
)
elif parts_to_drop or parts_to_add:
if parts_to_drop:
self.add_alter_type(PartitionAlterType.DROP_PARTITION)
if parts_to_add:
self.add_alter_type(PartitionAlterType.ADD_PARTITION)
return [True, False]
return [False, False]
def got_reorganize_subsql_after_add_drop_partition_detection(
self,
pdef_type,
old_tbl_part_defs,
new_tbl_part_defs,
segments,
) -> [bool, bool]:
old_tbl_end_index = len(old_tbl_part_defs) - 1
new_tbl_end_index = len(new_tbl_part_defs) - 1
# this verification takes place only after the checks for add/drop tables
# if this fails, it means the user is implicitly trying to perform multiple
# alter operations in a single statement which is not supported.
# Any reorganization should satisfy the min max constraints as well as
# not drop individual elements.
if pdef_type == PartitionConfig.PTYPE_RANGE:
# the new range max should be higher or equal to the old range max
# dateime/integers should be handled
# MAXVALUE is extracted as a string in PartitionModel, hence need to handle
# it separately
if not isinstance(
new_tbl_part_defs[new_tbl_end_index].pdef_value_list, str
) and (
isinstance(old_tbl_part_defs[old_tbl_end_index].pdef_value_list, str)
or (
self.old_range_is_higher_than_new_range(
old_tbl_part_defs[old_tbl_end_index].pdef_value_list[0],
new_tbl_part_defs[new_tbl_end_index].pdef_value_list[0],
)
)
):
return [False, False]
# The preliminiary check for split/merge is done at this point
# For list we are good to go and form a reorganize statement
# but range needs to satisfy the inner strict range limits
old_tbl_start_index = 0
new_tbl_start_index = 0
j = 0
for i in range(0, old_tbl_end_index + 1):
if j <= new_tbl_end_index and (
old_tbl_part_defs[i].pdef_name != new_tbl_part_defs[j].pdef_name
or old_tbl_part_defs[i].pdef_value_list
!= new_tbl_part_defs[j].pdef_value_list
):
old_tbl_start_index = i
new_tbl_start_index = j
break
j = j + 1
rj = new_tbl_end_index
for ri in reversed(range(old_tbl_end_index + 1)):
if rj >= 0 and (
old_tbl_part_defs[ri].pdef_name != new_tbl_part_defs[rj].pdef_name
or old_tbl_part_defs[ri].pdef_value_list
!= new_tbl_part_defs[rj].pdef_value_list
):
old_tbl_end_index = ri
new_tbl_end_index = rj
break
rj = rj - 1
# the range value before the end should match exactly
if pdef_type == PartitionConfig.PTYPE_RANGE:
if ri >= 0 and ri < len(old_tbl_part_defs) - 1:
if (
old_tbl_part_defs[ri].pdef_value_list
!= new_tbl_part_defs[rj].pdef_value_list
):
return [False, False]
source_partitions_reorganized = []
for ni in range(old_tbl_start_index, old_tbl_end_index + 1):
source_partitions_reorganized.append(old_tbl_part_defs[ni].pdef_name)
source_partitions_reorganized_sql = (", ").join(source_partitions_reorganized)
target_partitions_reorganized = []
for ni in range(new_tbl_start_index, new_tbl_end_index + 1):
partition = new_tbl_part_defs[ni]
value_list = self.get_value_list(partition.pdef_value_list)
target_partitions_reorganized.append(
"PARTITION {} VALUES {} ({})".format(
partition.pdef_name,
self.partition_definition_type(partition.pdef_type),
value_list,
)
)
target_partitions_reorganized_sql = (", ").join(target_partitions_reorganized)
sub_alter_sql = "REORGANIZE PARTITION {} INTO ({})".format(
source_partitions_reorganized_sql, target_partitions_reorganized_sql
)
self.add_alter_type(PartitionAlterType.REORGANIZE_PARTITION)
segments.append(sub_alter_sql)
return [True, True]
def get_value_list(self, pdef_value_list):
if isinstance(pdef_value_list, str):
value_list = pdef_value_list
else:
value_list = ", ".join(pdef_value_list)
return value_list
def old_range_is_higher_than_new_range(self, left_range, right_range) -> bool:
return left_range > right_range
def populate_alter_substatement_for_add_or_drop_table_for_range_or_list(
self,
parts_to_drop: PartitionConfig,
parts_to_add: PartitionConfig,
partition_attr_value,
segments,
) -> None:
if parts_to_add and parts_to_drop:
return
if parts_to_add:
add_parts = []
for partition in parts_to_add:
value_list = self.get_value_list(partition.pdef_value_list)
add_parts.append(
"PARTITION {} VALUES {} ({})".format(
partition.pdef_name,
self.partition_definition_type(partition.pdef_type),
value_list,
)
)
segments.append("ADD PARTITION (" + ", ".join(add_parts) + ")")
elif parts_to_drop:
# When we reach here, we cannot have a partition clause mentioned
# for list/range without associated partitions as the partition model
# should have flagged earlier.
if not partition_attr_value:
self.add_alter_type(PartitionAlterType.REMOVE_PARTITIONING)
segments.append("REMOVE PARTITIONING")
else:
dropped_parts = []
for partition in parts_to_drop:
dropped_parts.append("{}".format(partition.pdef_name))
segments.append("DROP PARTITION " + ", ".join(dropped_parts))
def populate_alter_substatement_for_add_or_drop_table_for_hash_or_key(
self,
old_partition_count,
new_partition_count,
partition_attr_value,
segments,
) -> None:
if old_partition_count == new_partition_count:
return
if old_partition_count < new_partition_count:
self.add_alter_type(PartitionAlterType.ADD_PARTITION)
segments.append(
"ADD PARTITION PARTITIONS {}".format(
new_partition_count - old_partition_count
)
)
return
if not partition_attr_value:
self.add_alter_type(PartitionAlterType.REMOVE_PARTITIONING)
segments.append("REMOVE PARTITIONING")
return
if new_partition_count == 0:
# Cannot remove all partitions, use DROP TABLE instead
self.add_alter_type(PartitionAlterType.INVALID_PARTITIONING)
return
self.add_alter_type(PartitionAlterType.DROP_PARTITION)
segments.append(
"COALESCE PARTITION {}".format(old_partition_count - new_partition_count)
)
def partition_definition_type(self, def_type):
return PartitionConfig.TYPE_MAP[def_type]
def valid_partitioning_alter(self, type):
return (
isinstance(type, PartitionAlterType)
and type != PartitionAlterType.INVALID_PARTITIONING
)
def to_sql(self):
"""
Generate an ALTER TABLE statement that can bring the schema from left to
right
"""
segments = []
segments.extend(self._gen_col_sql())
segments.extend(self._gen_idx_sql())
segments.extend(self._gen_tbl_attr_sql())
if segments:
return "ALTER TABLE `{}` {}".format(
escape(self.right.name), ", ".join(segments)
)
def get_type_conv_columns(old_obj, new_obj):
"""
Return a list of columns that involve type conversion when transit from left to
right
"""
type_conv_cols = []
current_cols = {c.name: c for c in old_obj.column_list}
new_cols = {c.name: c for c in new_obj.column_list}
# find columns that will involve type conversions
for name, old_col in current_cols.items():
new_col = new_cols.get(name)
# this column isn't in the new schema, so it
# doesn't matter
if new_col is None:
continue
# Type changes are considered as type conversion
if new_col.column_type != old_col.column_type:
type_conv_cols.append(old_col)
else:
# Length change also considered as type conversion
if new_col.length != old_col.length:
type_conv_cols.append(old_col)
return type_conv_cols
def need_default_ts_bootstrap(old_obj, new_obj):
"""
Check when going from old schema to new, whether bootstraping column using
CURRENT_TIMESTAMP is involved. This is normally dangerous thing to do out of
replication and will be disallowed by default from OSC perspective
"""
current_cols = {c.name: c for c in old_obj.column_list}
new_cols = {c.name: c for c in new_obj.column_list}
# find columns that will involve type conversions
for name, new_col in new_cols.items():
old_col = current_cols.get(name)
# This check only applies to column types that support default ts value
if new_col.column_type not in ["TIMESTAMP", "DATE", "DATETIME"]:
continue
if new_col.column_type == "TIMESTAMP":
new_col.explicit_ts_default()
# Nothing to worry if a vulnerable column type doesn't use current time
# as default
if str(new_col.column_type) == "TIMESTAMP":
# Cases for TIMESTAMP type
if (
str(new_col.default).upper() != "CURRENT_TIMESTAMP"
and str(new_col.on_update_current_timestamp).upper()
!= "CURRENT_TIMESTAMP"
):
continue
else:
# Cases for DATE and DATETIME type
if str(new_col.default).upper() != "CURRENT_TIMESTAMP":
continue
# Adding timestamp column with defaults is considered unsafe
# out of replication bootstraping
if not old_col:
return True
# At this point we know this column in new schema need default value setting
# to curernt ts. We will need to further confirm if old schema does the same
# or not. If not, this will be consider as dangerous for replication
if (
str(new_col.default).upper() == "CURRENT_TIMESTAMP"
and str(old_col.default).upper() != "CURRENT_TIMESTAMP"
):
return True
if (
str(new_col.on_update_current_timestamp).upper() == "CURRENT_TIMESTAMP"
and str(old_col.on_update_current_timestamp).upper() != "CURRENT_TIMESTAMP"
):
return True
return False