def run_ddl()

in core/lib/payload/copy.py [0:0]


    def run_ddl(self, db, sql):
        try:
            time_started = time.time()
            self._new_table = parse_create(sql)
            self._current_db = db
            self._current_db_dir = util.dirname_for_db(db)
            self.init_connection(db)
            self.init_table_obj()
            self.determine_outfile_dir()
            if self.force_cleanup:
                self.cleanup_with_force()
            if self.has_desired_schema():
                self.release_osc_lock()
                return
            self.unblock_no_pk_creation()
            self.pre_osc_check()
            self.create_delta_table()
            self.create_copy_table()
            self.create_triggers()
            self.start_snapshot()
            self.select_table_into_outfile()
            self.drop_non_unique_indexes()
            self.load_data()
            self.recreate_non_unique_indexes()
            self.analyze_table()
            self.checksum()
            log.info("== Stage 5: Catch up to reduce time for holding lock ==")
            self.replay_till_good2go(checksum=self.skip_delta_checksum)
            self.sync_table_partitions()
            self.swap_tables()
            self.reset_no_pk_creation()
            self.cleanup()
            self.print_stats()
            self.stats["wall_time"] = time.time() - time_started
        except (
            MySQLdb.OperationalError,
            MySQLdb.ProgrammingError,
            MySQLdb.IntegrityError,
        ) as e:
            errnum, errmsg = e.args
            log.error(
                "SQL execution error: [{}] {}\n"
                "When executing: {}\n"
                "With args: {}".format(
                    errnum, errmsg, self._sql_now, self._sql_args_now
                )
            )
            # 2013 stands for lost connection to MySQL
            # 2006 stands for MySQL has gone away
            # Both means we have been killed
            if errnum in (2006, 2013) and self.skip_cleanup_after_kill:
                # We can skip dropping table, and removing files.
                # However leaving trigger around may break
                # replication which is really bad. So trigger is the only
                # thing we need to clean up in this case
                self._cleanup_payload.remove_drop_table_entry(
                    self._current_db, self.new_table_name
                )
                self._cleanup_payload.remove_drop_table_entry(
                    self._current_db, self.delta_table_name
                )
                self._cleanup_payload.remove_all_file_entries()
            if not self.keep_tmp_table:
                self.cleanup()
            raise OSCError(
                "GENERIC_MYSQL_ERROR",
                {
                    "stage": "running DDL on db '{}'".format(db),
                    "errnum": errnum,
                    "errmsg": errmsg,
                },
                mysql_err_code=errnum,
            )
        except Exception as e:
            log.exception(
                "{0} Exception raised, start to cleanup before exit {0}".format(
                    "-" * 10
                )
            )
            # We want keep the temporary table for further investigation
            if not self.keep_tmp_table:
                self.cleanup()
            if not isinstance(e, OSCError):
                # It's a python exception
                raise OSCError("OSC_INTERNAL_ERROR", {"msg": str(e)})
            else:
                raise