core/lib/payload/cleanup.py (203 lines of code) (raw):

#!/usr/bin/env python3 """ Copyright (c) 2017-present, Facebook, Inc. All rights reserved. This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. """ import logging import os import re import time import MySQLdb from .. import constant, sql, util from ..error import OSCError from ..sql import escape from .base import Payload log = logging.getLogger(__name__) class CleanupPayload(Payload): """ This payload is not a schema change payload itself. It'll cleanup all the mess left behind by last OSC run """ def __init__(self, *args, **kwargs): super(CleanupPayload, self).__init__(*args, **kwargs) self.files_to_clean = [] self.to_drop = [] self.sqls_to_execute = [] self._current_db = kwargs.get("db") self._current_table = kwargs.get("table") self.databases = kwargs.get("database") self.kill_first = kwargs.get("kill", False) self.kill_only = kwargs.get("kill_only", False) def cleanup(self, db="mysql"): """ The actual cleanup logic, we will: - remove all the given files - drop all the given triggers - drop all the tables """ # Remove file first, because drop trigger may fail because of a full # disk. for filepath in self.files_to_clean: try: if os.path.isfile(filepath): util.rm(filepath, self.sudo) except Exception: # We will try our best to do the cleanup even when there's an # exception, because each cleanup entry is independent on each # other log.exception("Failed to cleanup file: {}".format(filepath)) # Drop table and triggers if not self._conn: self._conn = self.get_conn(db) self.gen_drop_sqls() self.get_mysql_settings() self.init_mysql_version() self.set_no_binlog() self.stop_slave_sql() # Stop sql thread to avoid MDL lock contention and blocking reads before # running DDLs. Will use high_pri_ddl instead if it's supported if self.is_high_pri_ddl_supported: self.enable_priority_ddl() else: self.lock_tables(tables=[self.table_name]) self.execute_sql("USE `{}`".format(escape(db))) current_db = db for stmt, stmt_db in self.sqls_to_execute: cleanupError = False try: # Switch to the database we are going to work on to avoid # cross db SQL execution if stmt_db != current_db: self.execute_sql("USE `{}`".format(escape(stmt_db))) current_db = stmt_db log.info("Executing db: {} sql: {}".format(stmt_db, stmt)) self.execute_sql(stmt) except MySQLdb.OperationalError as e: errnum, _ = e.args # 1507 means the partition doesn't exist, which # is most likely competing partition maintenance # 1508 means we tried to drop the last partition in a table if errnum in [1507, 1508]: continue cleanupError = True error = e except Exception as e: cleanupError = True error = e if cleanupError: self.sqls_to_execute = [] if not self.is_high_pri_ddl_supported: self.unlock_tables() self.start_slave_sql() log.error("Failed to execute sql for cleanup") raise OSCError( "CLEANUP_EXECUTION_ERROR", {"sql": stmt, "msg": str(error)} ) if not self.is_high_pri_ddl_supported: self.unlock_tables() self.sqls_to_execute = [] self.start_slave_sql() def add_file_entry(self, filepath): log.debug("Cleanup file entry added: {}".format(filepath)) self.files_to_clean.append(filepath) def remove_file_entry(self, filepath): log.debug("Cleanup file entry removed: {}".format(filepath)) self.files_to_clean.remove(filepath) def remove_all_file_entries(self): log.debug("Removing all cleanup file entries") self.files_to_clean = [] def add_sql_entry(self, sql): log.debug("Cleanup SQL entry added: {}".format(sql)) self.sqls_to_execute.append(sql) def gen_drop_sqls(self): # always drop trigger first, otherwise there's a small window # in which we have trigger exists but not having the corresponding # _chg table. If a change happens during this window, then replication # will break log.info("Generating drop trigger queries") for entry in self.to_drop: if entry["type"] == "trigger": db = entry["db"] trigger_name = entry["name"] sql_query = "DROP TRIGGER IF EXISTS `{}`".format(escape(trigger_name)) self.sqls_to_execute.append((sql_query, db)) log.info("Generating drop table queries") for entry in self.to_drop: if entry["type"] == "table": db = entry["db"] table = entry["name"] partition_method = self.get_partition_method(db, table) if partition_method in ("RANGE", "LIST"): # MySQL doesn't allow remove all the partitions in a # partitioned table, so we will leave single partition # there before drop the table if entry["partitions"]: entry["partitions"].pop() # Gradually drop partitions, so that we will not hold # metadata lock for too long and block requests with # single drop table log.debug( "{}/{} using {} partitioning method".format( db, table, partition_method ) ) for partition_name in entry["partitions"]: # As of version 8.0.17, MySQL does not support # "DROP PARTITION IF EXISTS". sql_query = ( "ALTER TABLE `{}` " "DROP PARTITION `{}`" ).format(escape(table), escape(partition_name)) self.sqls_to_execute.append((sql_query, db)) sql_query = "DROP TABLE IF EXISTS `{}`".format(table) self.sqls_to_execute.append((sql_query, db)) self.to_drop = [] def add_drop_table_entry(self, db, table, partitions=None): self.to_drop.append( {"type": "table", "db": db, "name": table, "partitions": partitions} ) def remove_drop_table_entry(self, db, table_name): for entry in self.to_drop: if entry["type"] == "table" and entry["name"] == table_name: self.to_drop.remove(entry) def add_drop_trigger_entry(self, db, trigger_name): self.to_drop.append({"type": "trigger", "db": db, "name": trigger_name}) def run_ddl(self): """ Try to search all the garbage left over by OSC and clean them """ self.cleanup() def search_for_tables(self): """ List all the tables that may left over by OSC in last run """ if self.databases: for db in self.databases: results = self.query( sql.get_all_osc_tables(db), ( constant.PREFIX, constant.PREFIX, db, ), ) for row in results: self.add_drop_table_entry(db, row["TABLE_NAME"]) else: results = self.query( sql.get_all_osc_tables(), ( constant.PREFIX, constant.PREFIX, ), ) for row in results: self.add_drop_table_entry(row["db"], row["TABLE_NAME"]) def search_for_triggers(self): """ List all the triggers that may left over by OSC in last run """ if self.databases: for db in self.databases: results = self.query( sql.get_all_osc_triggers(db), ( constant.PREFIX, constant.PREFIX, db, ), ) for row in results: self.add_drop_trigger_entry(db, row["TRIGGER_NAME"]) else: results = self.query( sql.get_all_osc_triggers(), ( constant.PREFIX, constant.PREFIX, ), ) for row in results: self.add_drop_trigger_entry(row["db"], row["TRIGGER_NAME"]) def search_for_files(self): """ List all the files that may have been left over by OSC in previous runs TODO: cleaning up is also done a lot in copy.py, so a future improvement here could be to refactor OSC in such a way that the cleanup part can be easily reused. T28154647 """ datadir = self.query(sql.select_as("@@datadir", "dir"))[0]["dir"] for root, _, files in os.walk(datadir): for fname in files: if re.match("__osc_.*\.[0-9]+", fname): self.add_file_entry(os.path.join(root, fname)) def kill_osc(self): """ Kill the running OSC process if there's one running. """ result = self.query( "SELECT IS_USED_LOCK(%s) as owner_id", (constant.OSC_LOCK_NAME,) ) owner_id = result[0]["owner_id"] if owner_id: log.info( "Named lock: {} is held by {}. Killing it to free up " "the lock".format(constant.OSC_LOCK_NAME, owner_id) ) # If we kill the mysql connection which is holding the named lock, # then OSC's python process will encounter a "MySQL has gone away" # error, and do the cleanup, then exit self.execute_sql(sql.kill_proc, (owner_id,)) else: log.info("No other OSC is running at the moment") def cleanup_all(self): """ Try to list all the possible files/tables left over by an unclean OSC exit, and remove all of them """ if self.kill_first: self.kill_osc() log.info("Wait 5 seconds for the running OSC to cleanup its own stuff ") time.sleep(5) if self.kill_only: return # Cleanup triggers first, otherwise DML against original table may fail # with a "table not exist" error. Because the table which is referenced # in the trigger was dropped first. self.search_for_triggers() self.search_for_tables() self.search_for_files() # cleanup is a critical part, We need to make sure there's no other # OSC running self.get_osc_lock() self.cleanup() self.release_osc_lock()