bool mysql_alter_table()

in sql/sql_table.cc [8334:9404]


bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
                       HA_CREATE_INFO *create_info,
                       TABLE_LIST *table_list,
                       Alter_info *alter_info,
                       uint order_num, ORDER *order, bool ignore)
{
  DBUG_ENTER("mysql_alter_table");

  if (block_myisam_tables(create_info, table_list))
  {
    my_error(ER_BLOCK_MYISAM_TABLES, MYF(0), NULL);
    DBUG_RETURN(true);
  }

  if (block_memory_tables(create_info, table_list))
  {
    my_error(ER_BLOCK_MEMORY_TABLES, MYF(0), NULL);
    DBUG_RETURN(true);
  }

  /*
    Check if we attempt to alter mysql.slow_log or
    mysql.general_log table and return an error if
    it is the case.
    TODO: this design is obsolete and will be removed.
  */
  int table_kind= check_if_log_table(table_list->db_length, table_list->db,
                                     table_list->table_name_length,
                                     table_list->table_name, false);

  if (table_kind)
  {
    /* Disable alter of enabled log tables */
    if (logger.is_log_table_enabled(table_kind))
    {
      my_error(ER_BAD_LOG_STATEMENT, MYF(0), "ALTER");
      DBUG_RETURN(true);
    }

    /* Disable alter of log tables to unsupported engine */
    if ((create_info->used_fields & HA_CREATE_USED_ENGINE) &&
        (!create_info->db_type || /* unknown engine */
         !(create_info->db_type->flags & HTON_SUPPORT_LOG_TABLES)))
    {
      my_error(ER_UNSUPORTED_LOG_ENGINE, MYF(0));
      DBUG_RETURN(true);
    }

#ifdef WITH_PARTITION_STORAGE_ENGINE
    if (alter_info->flags & Alter_info::ALTER_PARTITION)
    {
      my_error(ER_WRONG_USAGE, MYF(0), "PARTITION", "log table");
      DBUG_RETURN(true);
    }
#endif
  }

  THD_STAGE_INFO(thd, stage_init);

  /*
    Code below can handle only base tables so ensure that we won't open a view.
    Note that RENAME TABLE the only ALTER clause which is supported for views
    has been already processed.
  */
  table_list->required_type= FRMTYPE_TABLE;

  Alter_table_prelocking_strategy alter_prelocking_strategy;

  DEBUG_SYNC(thd, "alter_table_before_open_tables");
  uint tables_opened;
  bool error= open_tables(thd, &table_list, &tables_opened, 0,
                          &alter_prelocking_strategy);

  DEBUG_SYNC(thd, "alter_opened_table");

  if (error)
    DBUG_RETURN(true);

  TABLE *table= table_list->table;

  if (table == NULL && table_list->open_strategy == TABLE_LIST::OPEN_IF_EXISTS)
  {
    /*
      This should only happen if the user requested an ALTER IF EXISTS and
      the table did not exist.  Generate a warning.
    */
    String tbl_name;
    tbl_name.append(String(table_list->db, system_charset_info));
    tbl_name.append('.');
    tbl_name.append(String(table_list->table_name, system_charset_info));

    push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
                        ER_BAD_TABLE_ERROR, ER(ER_BAD_TABLE_ERROR),
                        tbl_name.c_ptr());

    my_ok(thd);
    DBUG_RETURN(false);
  }

  table->use_all_columns();
  MDL_ticket *mdl_ticket= table->mdl_ticket;

  /*
    Prohibit changing of the UNION list of a non-temporary MERGE table
    under LOCK tables. It would be quite difficult to reuse a shrinked
    set of tables from the old table or to open a new TABLE object for
    an extended list and verify that they belong to locked tables.
  */
  if ((thd->locked_tables_mode == LTM_LOCK_TABLES ||
       thd->locked_tables_mode == LTM_PRELOCKED_UNDER_LOCK_TABLES) &&
      (create_info->used_fields & HA_CREATE_USED_UNION) &&
      (table->s->tmp_table == NO_TMP_TABLE))
  {
    my_error(ER_LOCK_OR_ACTIVE_TRANSACTION, MYF(0));
    DBUG_RETURN(true);
  }

  Alter_table_ctx alter_ctx(thd, table_list, tables_opened, new_db, new_name);

  /*
    Add old and new (if any) databases to the list of accessed databases
    for this statement. Needed for MTS.
  */
  thd->add_to_binlog_accessed_dbs(alter_ctx.db);
  if (alter_ctx.is_database_changed())
    thd->add_to_binlog_accessed_dbs(alter_ctx.new_db);

  MDL_request target_mdl_request;

  /* Check that we are not trying to rename to an existing table */
  if (alter_ctx.is_table_renamed())
  {
    if (table->s->tmp_table != NO_TMP_TABLE)
    {
      if (find_temporary_table(thd, alter_ctx.new_db, alter_ctx.new_name))
      {
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alter_ctx.new_alias);
        DBUG_RETURN(true);
      }
    }
    else
    {
      MDL_request_list mdl_requests;
      MDL_request target_db_mdl_request;

      target_mdl_request.init(MDL_key::TABLE,
                              alter_ctx.new_db, alter_ctx.new_name,
                              MDL_EXCLUSIVE, MDL_TRANSACTION);
      mdl_requests.push_front(&target_mdl_request);

      /*
        If we are moving the table to a different database, we also
        need IX lock on the database name so that the target database
        is protected by MDL while the table is moved.
      */
      if (alter_ctx.is_database_changed())
      {
        target_db_mdl_request.init(MDL_key::SCHEMA, alter_ctx.new_db, "",
                                   MDL_INTENTION_EXCLUSIVE,
                                   MDL_TRANSACTION);
        mdl_requests.push_front(&target_db_mdl_request);
      }

      /*
        Global intention exclusive lock must have been already acquired when
        table to be altered was open, so there is no need to do it here.
      */
      DBUG_ASSERT(thd->mdl_context.is_lock_owner(MDL_key::GLOBAL,
                                                 "", "",
                                                 MDL_INTENTION_EXCLUSIVE));

      if (thd->mdl_context.acquire_locks_nsec(&mdl_requests,
                                         thd->variables.lock_wait_timeout_nsec))
        DBUG_RETURN(true);

      DEBUG_SYNC(thd, "locked_table_name");
      /*
        Table maybe does not exist, but we got an exclusive lock
        on the name, now we can safely try to find out for sure.
      */
      if (!access(alter_ctx.get_new_filename(), F_OK))
      {
        /* Table will be closed in do_command() */
        my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alter_ctx.new_alias);
        DBUG_RETURN(true);
      }
    }
  }

  if (!create_info->db_type)
  {
#ifdef WITH_PARTITION_STORAGE_ENGINE
    if (table->part_info &&
        create_info->used_fields & HA_CREATE_USED_ENGINE)
    {
      /*
        This case happens when the user specified
        ENGINE = x where x is a non-existing storage engine
        We set create_info->db_type to default_engine_type
        to ensure we don't change underlying engine type
        due to a erroneously given engine name.
      */
      create_info->db_type= table->part_info->default_engine_type;
    }
    else
#endif
      create_info->db_type= table->s->db_type();
  }

  if (check_engine(thd, alter_ctx.new_db, alter_ctx.new_name, create_info))
    DBUG_RETURN(true);

  if ((create_info->db_type != table->s->db_type() ||
       alter_info->flags & Alter_info::ALTER_PARTITION) &&
      !table->file->can_switch_engines())
  {
    my_error(ER_ROW_IS_REFERENCED, MYF(0));
    DBUG_RETURN(true);
  }

  /*
   If foreign key is added then check permission to access parent table.

   In function "check_fk_parent_table_access", create_info->db_type is used
   to identify whether engine supports FK constraint or not. Since
   create_info->db_type is set here, check to parent table access is delayed
   till this point for the alter operation.
  */
  if ((alter_info->flags & Alter_info::ADD_FOREIGN_KEY) &&
      check_fk_parent_table_access(thd, create_info, alter_info))
    DBUG_RETURN(true);

  /*
   If this is an ALTER TABLE and no explicit row type specified reuse
   the table's row type.
   Note : this is the same as if the row type was specified explicitly.
  */
  if (create_info->row_type == ROW_TYPE_NOT_USED)
  {
    /* ALTER TABLE without explicit row type */
    create_info->row_type= table->s->row_type;
  }
  else
  {
    /* ALTER TABLE with specific row type */
    create_info->used_fields |= HA_CREATE_USED_ROW_FORMAT;
  }

  DBUG_PRINT("info", ("old type: %s  new type: %s",
             ha_resolve_storage_engine_name(table->s->db_type()),
             ha_resolve_storage_engine_name(create_info->db_type)));
  if (ha_check_storage_engine_flag(table->s->db_type(), HTON_ALTER_NOT_SUPPORTED) ||
      ha_check_storage_engine_flag(create_info->db_type, HTON_ALTER_NOT_SUPPORTED))
  {
    DBUG_PRINT("info", ("doesn't support alter"));
    my_error(ER_ILLEGAL_HA, MYF(0), table_list->table_name);
    DBUG_RETURN(true);
  }

  THD_STAGE_INFO(thd, stage_setup);
  if (!(alter_info->flags & ~(Alter_info::ALTER_RENAME |
                              Alter_info::ALTER_KEYS_ONOFF)) &&
      alter_info->requested_algorithm !=
      Alter_info::ALTER_TABLE_ALGORITHM_COPY &&
      !table->s->tmp_table) // no need to touch frm
  {
    // This requires X-lock, no other lock levels supported.
    if (alter_info->requested_lock != Alter_info::ALTER_TABLE_LOCK_DEFAULT &&
        alter_info->requested_lock != Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE)
    {
      my_error(ER_ALTER_OPERATION_NOT_SUPPORTED, MYF(0),
               "LOCK=NONE/SHARED", "LOCK=EXCLUSIVE");
      DBUG_RETURN(true);
    }
    DBUG_RETURN(simple_rename_or_index_change(thd, table_list,
                                              alter_info->keys_onoff,
                                              &alter_ctx));
  }

  /* We have to do full alter table. */

#ifdef WITH_PARTITION_STORAGE_ENGINE
  bool partition_changed= false;
  partition_info *new_part_info= NULL;
  {
    if (prep_alter_part_table(thd, table, alter_info, create_info,
                              &alter_ctx, &partition_changed,
                              &new_part_info))
    {
      DBUG_RETURN(true);
    }
  }
#endif

  if (mysql_prepare_alter_table(thd, table, create_info, alter_info,
                                &alter_ctx))
  {
    DBUG_RETURN(true);
  }

  set_table_default_charset(thd, create_info, alter_ctx.db);

#ifdef WITH_PARTITION_STORAGE_ENGINE
  if (new_part_info)
  {
    /*
      ALGORITHM and LOCK clauses are generally not allowed by the
      parser for operations related to partitioning.
      The exceptions are ALTER_PARTITION and ALTER_REMOVE_PARTITIONING.
      For consistency, we report ER_ALTER_OPERATION_NOT_SUPPORTED here.
    */
    if (alter_info->requested_lock !=
        Alter_info::ALTER_TABLE_LOCK_DEFAULT)
    {
      my_error(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON, MYF(0),
               "LOCK=NONE/SHARED/EXCLUSIVE",
               ER(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_PARTITION),
               "LOCK=DEFAULT");
      DBUG_RETURN(true);
    }
    else if (alter_info->requested_algorithm !=
             Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT)
    {
      my_error(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON, MYF(0),
               "ALGORITHM=COPY/INPLACE",
               ER(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_PARTITION),
               "ALGORITHM=DEFAULT");
      DBUG_RETURN(true);
    }

    /*
      Upgrade from MDL_SHARED_UPGRADABLE to MDL_SHARED_NO_WRITE.
      Afterwards it's safe to take the table level lock.
    */
    if (thd->mdl_context.upgrade_shared_lock_nsec(mdl_ticket,
                                    MDL_SHARED_NO_WRITE,
                                    thd->variables.lock_wait_timeout_nsec)
        || lock_tables(thd, table_list, alter_ctx.tables_opened, 0))
    {
      DBUG_RETURN(true);
    }

    // In-place execution of ALTER TABLE for partitioning.
    DBUG_RETURN(fast_alter_partition_table(thd, table, alter_info,
                                           create_info, table_list,
                                           alter_ctx.db,
                                           alter_ctx.table_name,
                                           new_part_info));
  }
#endif

  /*
    Use copy algorithm if:
    - old_alter_table system variable is set without in-place requested using
      the ALGORITHM clause.
    - Or if in-place is impossible for given operation.
    - Changes to partitioning which were not handled by fast_alter_part_table()
      needs to be handled using table copying algorithm unless the engine
      supports auto-partitioning as such engines can do some changes
      using in-place API.
  */
  if ((thd->variables.old_alter_table &&
       alter_info->requested_algorithm !=
       Alter_info::ALTER_TABLE_ALGORITHM_INPLACE)
      || is_inplace_alter_impossible(table, create_info, alter_info)
#ifdef WITH_PARTITION_STORAGE_ENGINE
      || (partition_changed &&
          !(table->s->db_type()->partition_flags() & HA_USE_AUTO_PARTITION))
#endif
     )
  {
    if (alter_info->requested_algorithm ==
        Alter_info::ALTER_TABLE_ALGORITHM_INPLACE)
    {
      my_error(ER_ALTER_OPERATION_NOT_SUPPORTED, MYF(0),
               "ALGORITHM=INPLACE", "ALGORITHM=COPY");
      DBUG_RETURN(true);
    }
    alter_info->requested_algorithm= Alter_info::ALTER_TABLE_ALGORITHM_COPY;
  }

  /*
    If 'avoid_temporal_upgrade' mode is not enabled, then the
    pre MySQL 5.6.4 old temporal types if present is upgraded to the
    current format.
  */

  mysql_mutex_lock(&LOCK_global_system_variables);
  bool check_temporal_upgrade= !avoid_temporal_upgrade;
  mysql_mutex_unlock(&LOCK_global_system_variables);

  if (check_temporal_upgrade)
  {
    if (upgrade_old_temporal_types(thd, alter_info))
      DBUG_RETURN(true);
  }

  /*
    ALTER TABLE ... ENGINE to the same engine is a common way to
    request table rebuild. Set ALTER_RECREATE flag to force table
    rebuild.
  */
  if (create_info->db_type == table->s->db_type() &&
      create_info->used_fields & HA_CREATE_USED_ENGINE)
    alter_info->flags|= Alter_info::ALTER_RECREATE;

  /*
    If the old table had partitions and we are doing ALTER TABLE ...
    engine= <new_engine>, the new table must preserve the original
    partitioning. This means that the new engine is still the
    partitioning engine, not the engine specified in the parser.
    This is discovered in prep_alter_part_table, which in such case
    updates create_info->db_type.
    It's therefore important that the assignment below is done
    after prep_alter_part_table.
  */
  handlerton *new_db_type= create_info->db_type;
  handlerton *old_db_type= table->s->db_type();
  TABLE *new_table= NULL;
  ha_rows copied=0,deleted=0;

  /*
    Handling of symlinked tables:
    If no rename:
      Create new data file and index file on the same disk as the
      old data and index files.
      Copy data.
      Rename new data file over old data file and new index file over
      old index file.
      Symlinks are not changed.

   If rename:
      Create new data file and index file on the same disk as the
      old data and index files.  Create also symlinks to point at
      the new tables.
      Copy data.
      At end, rename intermediate tables, and symlinks to intermediate
      table, to final table name.
      Remove old table and old symlinks

    If rename is made to another database:
      Create new tables in new database.
      Copy data.
      Remove old table and symlinks.
  */
  char index_file[FN_REFLEN], data_file[FN_REFLEN];

  if (!alter_ctx.is_database_changed())
  {
    if (create_info->index_file_name)
    {
      /* Fix index_file_name to have 'tmp_name' as basename */
      strmov(index_file, alter_ctx.tmp_name);
      create_info->index_file_name=fn_same(index_file,
                                           create_info->index_file_name,
                                           1);
    }
    if (create_info->data_file_name)
    {
      /* Fix data_file_name to have 'tmp_name' as basename */
      strmov(data_file, alter_ctx.tmp_name);
      create_info->data_file_name=fn_same(data_file,
                                          create_info->data_file_name,
                                          1);
    }
  }
  else
  {
    /* Ignore symlink if db is changed. */
    create_info->data_file_name=create_info->index_file_name=0;
  }

  DEBUG_SYNC(thd, "alter_table_before_create_table_no_lock");
  DBUG_EXECUTE_IF("sleep_before_create_table_no_lock",
                  my_sleep(100000););
  /* We can abort alter table for any table type */
  thd->abort_on_warning= !ignore && thd->is_strict_mode();

  /*
    Promote first timestamp column, when explicit_defaults_for_timestamp
    is not set
  */
  if (!thd->variables.explicit_defaults_for_timestamp)
    promote_first_timestamp_column(&alter_info->create_list);

  /*
    Create .FRM for new version of table with a temporary name.
    We don't log the statement, it will be logged later.

    Keep information about keys in newly created table as it
    will be used later to construct Alter_inplace_info object
    and by fill_alter_inplace_info() call.
  */
  KEY *key_info;
  uint key_count;
  /*
    Remember if the new definition has new VARCHAR column;
    create_info->varchar will be reset in create_table_impl()/
    mysql_prepare_create_table().
  */
  bool varchar= create_info->varchar;

  tmp_disable_binlog(thd);

  // If original table has no primary key, then block_create_no_primary_key
  // shouldn't affect any alter.
  bool ignore_block_create_no_primary_key = false;
  if (table_list->table->s->primary_key == MAX_KEY)
    ignore_block_create_no_primary_key = true;

  error= create_table_impl(thd, alter_ctx.new_db, alter_ctx.tmp_name,
                           alter_ctx.table_name,
                           alter_ctx.get_tmp_path(),
                           create_info, alter_info,
                           true, 0, true,
                           ignore_block_create_no_primary_key,
                           NULL, &key_info, &key_count);
  reenable_binlog(thd);
  thd->abort_on_warning= false;
  if (error)
    DBUG_RETURN(true);

  /* Remember that we have not created table in storage engine yet. */
  bool no_ha_table= true;

  if (alter_info->requested_algorithm != Alter_info::ALTER_TABLE_ALGORITHM_COPY)
  {
    Alter_inplace_info ha_alter_info(create_info, alter_info,
                                     key_info, key_count,
#ifdef WITH_PARTITION_STORAGE_ENGINE
                                     thd->work_part_info,
#else
                                     NULL,
#endif
                                     ignore);
    TABLE *altered_table= NULL;
    bool use_inplace= true;

    /* Fill the Alter_inplace_info structure. */
    if (fill_alter_inplace_info(thd, table, varchar, &ha_alter_info))
      goto err_new_table_cleanup;

    // We assume that the table is non-temporary.
    DBUG_ASSERT(!table->s->tmp_table);

    if (!(altered_table= open_table_uncached(thd, alter_ctx.get_tmp_path(),
                                             alter_ctx.new_db,
                                             alter_ctx.tmp_name,
                                             true, false)))
      goto err_new_table_cleanup;

    /* Set markers for fields in TABLE object for altered table. */
    update_altered_table(ha_alter_info, altered_table);

    /*
      Mark all columns in 'altered_table' as used to allow usage
      of its record[0] buffer and Field objects during in-place
      ALTER TABLE.
    */
    altered_table->column_bitmaps_set_no_signal(&altered_table->s->all_set,
                                                &altered_table->s->all_set);

    set_column_defaults(altered_table, alter_info->create_list);

    if (ha_alter_info.handler_flags == 0)
    {
      /*
        No-op ALTER, no need to call handler API functions.

        If this code path is entered for an ALTER statement that
        should not be a real no-op, new handler flags should be added
        and fill_alter_inplace_info() adjusted.

        Note that we can end up here if an ALTER statement has clauses
        that cancel each other out (e.g. ADD/DROP identically index).

        Also note that we ignore the LOCK clause here.
      */
      close_temporary_table(thd, altered_table, true, false);
      (void) quick_rm_table(thd, new_db_type, alter_ctx.new_db,
                            alter_ctx.tmp_name, FN_IS_TMP | NO_HA_TABLE);
      goto end_inplace;
    }

    // Ask storage engine whether to use copy or in-place
    enum_alter_inplace_result inplace_supported=
      table->file->check_if_supported_inplace_alter(altered_table,
                                                    &ha_alter_info);

    switch (inplace_supported) {
    case HA_ALTER_INPLACE_EXCLUSIVE_LOCK:
      // If SHARED lock and no particular algorithm was requested, use COPY.
      if (alter_info->requested_lock ==
          Alter_info::ALTER_TABLE_LOCK_SHARED &&
          alter_info->requested_algorithm ==
          Alter_info::ALTER_TABLE_ALGORITHM_DEFAULT)
      {
        use_inplace= false;
      }
      // Otherwise, if weaker lock was requested, report errror.
      else if (alter_info->requested_lock ==
               Alter_info::ALTER_TABLE_LOCK_NONE ||
               alter_info->requested_lock ==
               Alter_info::ALTER_TABLE_LOCK_SHARED)
      {
        ha_alter_info.report_unsupported_error("LOCK=NONE/SHARED",
                                               "LOCK=EXCLUSIVE");
        close_temporary_table(thd, altered_table, true, false);
        goto err_new_table_cleanup;
      }
      break;
    case HA_ALTER_INPLACE_SHARED_LOCK_AFTER_PREPARE:
    case HA_ALTER_INPLACE_SHARED_LOCK:
      // If weaker lock was requested, report errror.
      if (alter_info->requested_lock ==
          Alter_info::ALTER_TABLE_LOCK_NONE)
      {
        ha_alter_info.report_unsupported_error("LOCK=NONE", "LOCK=SHARED");
        close_temporary_table(thd, altered_table, true, false);
        goto err_new_table_cleanup;
      }
      break;
    case HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE:
    case HA_ALTER_INPLACE_NO_LOCK:
      break;
    case HA_ALTER_INPLACE_NOT_SUPPORTED:
      // If INPLACE was requested, report error.
      if (alter_info->requested_algorithm ==
          Alter_info::ALTER_TABLE_ALGORITHM_INPLACE)
      {
        ha_alter_info.report_unsupported_error("ALGORITHM=INPLACE",
                                               "ALGORITHM=COPY");
        close_temporary_table(thd, altered_table, true, false);
        goto err_new_table_cleanup;
      }
      // COPY with LOCK=NONE is not supported, no point in trying.
      if (alter_info->requested_lock ==
          Alter_info::ALTER_TABLE_LOCK_NONE)
      {
        ha_alter_info.report_unsupported_error("LOCK=NONE", "LOCK=SHARED");
        close_temporary_table(thd, altered_table, true, false);
        goto err_new_table_cleanup;
      }
      // Otherwise use COPY
      use_inplace= false;
      break;
    case HA_ALTER_ERROR:
    default:
      close_temporary_table(thd, altered_table, true, false);
      goto err_new_table_cleanup;
    }

    if (use_inplace)
    {
      if (mysql_inplace_alter_table(thd, table_list, table,
                                    altered_table,
                                    &ha_alter_info,
                                    inplace_supported, &target_mdl_request,
                                    &alter_ctx))
      {
        DBUG_RETURN(true);
      }

      goto end_inplace;
    }
    else
    {
      close_temporary_table(thd, altered_table, true, false);
    }
  }

  /* ALTER TABLE using copy algorithm. */

  /* Check if ALTER TABLE is compatible with foreign key definitions. */
  if (fk_prepare_copy_alter_table(thd, table, alter_info, &alter_ctx))
    goto err_new_table_cleanup;

  if (!table->s->tmp_table)
  {
    // COPY algorithm doesn't work with concurrent writes.
    if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_NONE)
    {
      my_error(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON, MYF(0),
               "LOCK=NONE",
               ER(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_COPY),
               "LOCK=SHARED");
      goto err_new_table_cleanup;
    }

    // If EXCLUSIVE lock is requested, upgrade already.
    if (alter_info->requested_lock == Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE &&
        wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN))
      goto err_new_table_cleanup;

    /*
      Otherwise upgrade to SHARED_NO_WRITE.
      Note that under LOCK TABLES, we will already have SHARED_NO_READ_WRITE.
    */
    if (alter_info->requested_lock != Alter_info::ALTER_TABLE_LOCK_EXCLUSIVE &&
        thd->mdl_context.upgrade_shared_lock_nsec(mdl_ticket,
                                    MDL_SHARED_NO_WRITE,
                                    thd->variables.lock_wait_timeout_nsec))
      goto err_new_table_cleanup;

    DEBUG_SYNC(thd, "alter_table_copy_after_lock_upgrade");
  }

  // It's now safe to take the table level lock.
  if (lock_tables(thd, table_list, alter_ctx.tables_opened, 0))
    goto err_new_table_cleanup;

  {
    if (ha_create_table(thd, alter_ctx.get_tmp_path(),
                        alter_ctx.new_db, alter_ctx.tmp_name,
                        create_info, false))
      goto err_new_table_cleanup;

    /* Mark that we have created table in storage engine. */
    no_ha_table= false;

    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
    {
      if (!open_table_uncached(thd, alter_ctx.get_tmp_path(),
                               alter_ctx.new_db, alter_ctx.tmp_name,
                               true, true))
        goto err_new_table_cleanup;
      /* in case of alter temp table send the tracker in OK packet */
      if (thd->session_tracker.get_tracker(SESSION_STATE_CHANGE_TRACKER)->is_enabled())
        thd->session_tracker.get_tracker(SESSION_STATE_CHANGE_TRACKER)->mark_as_changed(thd, NULL);
    }
  }


  /* Open the table since we need to copy the data. */
  if (table->s->tmp_table != NO_TMP_TABLE)
  {
    TABLE_LIST tbl;
    tbl.init_one_table(alter_ctx.new_db, strlen(alter_ctx.new_db),
                       alter_ctx.tmp_name, strlen(alter_ctx.tmp_name),
                       alter_ctx.tmp_name, TL_READ_NO_INSERT);
    /* Table is in thd->temporary_tables */
    (void) open_temporary_table(thd, &tbl);
    new_table= tbl.table;
  }
  else
  {
    /* table is a normal table: Create temporary table in same directory */
    /* Open our intermediate table. */
    new_table= open_table_uncached(thd, alter_ctx.get_tmp_path(),
                                   alter_ctx.new_db, alter_ctx.tmp_name,
                                   true, true);
  }
  if (!new_table)
    goto err_new_table_cleanup;
  /*
    Note: In case of MERGE table, we do not attach children. We do not
    copy data for MERGE tables. Only the children have data.
  */

  /* Copy the data if necessary. */
  thd->count_cuted_fields= CHECK_FIELD_WARN;	// calc cuted fields
  thd->cuted_fields=0L;

  /*
    We do not copy data for MERGE tables. Only the children have data.
    MERGE tables have HA_NO_COPY_ON_ALTER set.
  */
  if (!(new_table->file->ha_table_flags() & HA_NO_COPY_ON_ALTER))
  {

    /*
      Check if we can temporarily remove secondary indexes from the table
      before copying the data and recreate them later to utilize InnoDB fast
      index creation.
      TODO: is there a better way to check for InnoDB?
    */
    bool optimize_keys= (alter_info->delayed_key_count > 0) &&
      !my_strcasecmp(system_charset_info,
                     new_table->file->table_type(), "InnoDB");
    new_table->next_number_field=new_table->found_next_number_field;
    THD_STAGE_INFO(thd, stage_copy_to_tmp_table);
    DBUG_EXECUTE_IF("abort_copy_table", {
        my_error(ER_LOCK_WAIT_TIMEOUT, MYF(0), "abort_copy_table");
        goto err_new_table_cleanup;
      });

    if (optimize_keys)
    {
      /* ignore the error */
      remove_secondary_keys(thd, create_info, new_table, alter_info);
    }

    if (copy_data_between_tables(table, new_table,
                                 alter_info->create_list, ignore,
                                 order_num, order, &copied, &deleted,
                                 alter_info->keys_onoff,
                                 &alter_ctx))
      goto err_new_table_cleanup;

    if (optimize_keys)
      if (restore_secondary_keys(thd, create_info, new_table, alter_info))
        goto err_new_table_cleanup;
  }
  else
  {
    /* Should be MERGE only */
    DBUG_ASSERT(new_table->file->ht->db_type == DB_TYPE_MRG_MYISAM);
    if (!table->s->tmp_table &&
        wait_while_table_is_used(thd, table, HA_EXTRA_FORCE_REOPEN))
      goto err_new_table_cleanup;
    THD_STAGE_INFO(thd, stage_manage_keys);
    DEBUG_SYNC(thd, "alter_table_manage_keys");
    alter_table_manage_keys(table, table->file->indexes_are_disabled(),
                            alter_info->keys_onoff);
    if (trans_commit_stmt(thd) || trans_commit_implicit(thd))
      goto err_new_table_cleanup;
  }
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;

  if (table->s->tmp_table != NO_TMP_TABLE)
  {
    /* Close lock if this is a transactional table */
    if (thd->lock)
    {
      if (thd->locked_tables_mode != LTM_LOCK_TABLES &&
          thd->locked_tables_mode != LTM_PRELOCKED_UNDER_LOCK_TABLES)
      {
        mysql_unlock_tables(thd, thd->lock);
        thd->lock= NULL;
      }
      else
      {
        /*
          If LOCK TABLES list is not empty and contains this table,
          unlock the table and remove the table from this list.
        */
        mysql_lock_remove(thd, thd->lock, table);
      }
    }
    /* Remove link to old table and rename the new one */
    close_temporary_table(thd, table, true, true);
    /* Should pass the 'new_name' as we store table name in the cache */
    if (rename_temporary_table(thd, new_table,
                               alter_ctx.new_db, alter_ctx.new_name))
      goto err_new_table_cleanup;
    /* We don't replicate alter table statement on temporary tables */
    if (!thd->is_current_stmt_binlog_format_row() &&
        write_bin_log(thd, true, thd->query(), thd->query_length()))
      DBUG_RETURN(true);
    goto end_temporary;
  }

  /*
    Close the intermediate table that will be the new table, but do
    not delete it! Even altough MERGE tables do not have their children
    attached here it is safe to call close_temporary_table().
  */
  close_temporary_table(thd, new_table, true, false);
  new_table= NULL;

  DEBUG_SYNC(thd, "alter_table_before_rename_result_table");

  /*
    Data is copied. Now we:
    1) Wait until all other threads will stop using old version of table
       by upgrading shared metadata lock to exclusive one.
    2) Close instances of table open by this thread and replace them
       with placeholders to simplify reopen process.
    3) Rename the old table to a temp name, rename the new one to the
       old name.
    4) If we are under LOCK TABLES and don't do ALTER TABLE ... RENAME
       we reopen new version of table.
    5) Write statement to the binary log.
    6) If we are under LOCK TABLES and do ALTER TABLE ... RENAME we
       remove placeholders and release metadata locks.
    7) If we are not not under LOCK TABLES we rely on the caller
      (mysql_execute_command()) to release metadata locks.
  */

  THD_STAGE_INFO(thd, stage_rename_result_table);

  if (wait_while_table_is_used(thd, table, HA_EXTRA_PREPARE_FOR_RENAME))
    goto err_new_table_cleanup;

  close_all_tables_for_name(thd, table->s, alter_ctx.is_table_renamed(), NULL);
  table_list->table= table= NULL;                  /* Safety */

  /*
    Rename the old table to temporary name to have a backup in case
    anything goes wrong while renaming the new table.
  */
  char backup_name[32];
  my_snprintf(backup_name, sizeof(backup_name), "%s2-%lx-%x", tmp_file_prefix,
              current_pid, thd->thread_id());
  if (lower_case_table_names)
    my_casedn_str(files_charset_info, backup_name);
  if (mysql_rename_table(old_db_type, alter_ctx.db, alter_ctx.table_name,
                         alter_ctx.db, backup_name, FN_TO_IS_TMP))
  {
    // Rename to temporary name failed, delete the new table, abort ALTER.
    (void) quick_rm_table(thd, new_db_type, alter_ctx.new_db,
                          alter_ctx.tmp_name, FN_IS_TMP);
    goto err_with_mdl;
  }

  // Rename the new table to the correct name.
  if (mysql_rename_table(new_db_type, alter_ctx.new_db, alter_ctx.tmp_name,
                         alter_ctx.new_db, alter_ctx.new_alias,
                         FN_FROM_IS_TMP))
  {
    // Rename failed, delete the temporary table.
    (void) quick_rm_table(thd, new_db_type, alter_ctx.new_db,
                          alter_ctx.tmp_name, FN_IS_TMP);
    
    // Restore the backup of the original table to the old name.
    (void) mysql_rename_table(old_db_type, alter_ctx.db, backup_name,
                              alter_ctx.db, alter_ctx.alias, 
                              FN_FROM_IS_TMP | NO_FK_CHECKS);
    
    goto err_with_mdl;
  }

  // Check if we renamed the table and if so update trigger files.
  if (alter_ctx.is_table_renamed() &&
      Table_triggers_list::change_table_name(thd,
                                             alter_ctx.db,
                                             alter_ctx.alias,
                                             alter_ctx.table_name,
                                             alter_ctx.new_db,
                                             alter_ctx.new_alias))
  {
    // Rename succeeded, delete the new table.
    (void) quick_rm_table(thd, new_db_type,
                          alter_ctx.new_db, alter_ctx.new_alias, 0);
    // Restore the backup of the original table to the old name.
    (void) mysql_rename_table(old_db_type, alter_ctx.db, backup_name,
                              alter_ctx.db, alter_ctx.alias, 
                              FN_FROM_IS_TMP | NO_FK_CHECKS);
    goto err_with_mdl;
  }

  // ALTER TABLE succeeded, delete the backup of the old table.
  if (quick_rm_table(thd, old_db_type, alter_ctx.db, backup_name, FN_IS_TMP))
  {
    /*
      The fact that deletion of the backup failed is not critical
      error, but still worth reporting as it might indicate serious
      problem with server.
    */
    goto err_with_mdl;
  }

end_inplace:

  if (thd->locked_tables_list.reopen_tables(thd))
    goto err_with_mdl;

  THD_STAGE_INFO(thd, stage_end);

  DBUG_EXECUTE_IF("sleep_alter_before_main_binlog", my_sleep(6000000););
  DEBUG_SYNC(thd, "alter_table_before_main_binlog");

  ha_binlog_log_query(thd, create_info->db_type, LOGCOM_ALTER_TABLE,
                      thd->query(), thd->query_length(),
                      alter_ctx.db, alter_ctx.table_name);

  DBUG_ASSERT(!(mysql_bin_log.is_open() &&
                thd->is_current_stmt_binlog_format_row() &&
                (create_info->options & HA_LEX_CREATE_TMP_TABLE)));
  if (write_bin_log(thd, true, thd->query(), thd->query_length()))
    DBUG_RETURN(true);

  if (ha_check_storage_engine_flag(old_db_type, HTON_FLUSH_AFTER_RENAME))
  {
    /*
      For the alter table to be properly flushed to the logs, we
      have to open the new table.  If not, we get a problem on server
      shutdown. But we do not need to attach MERGE children.
    */
    TABLE *t_table;
    t_table= open_table_uncached(thd, alter_ctx.get_new_path(),
                                 alter_ctx.new_db, alter_ctx.new_name,
                                 false, true);
    if (t_table)
      intern_close_table(t_table);
    else
      sql_print_warning("Could not open table %s.%s after rename\n",
                        alter_ctx.new_db, alter_ctx.table_name);
    ha_flush_logs(old_db_type);
  }
  table_list->table= NULL;			// For query cache
  query_cache_invalidate3(thd, table_list, false);

  if (thd->locked_tables_mode == LTM_LOCK_TABLES ||
      thd->locked_tables_mode == LTM_PRELOCKED_UNDER_LOCK_TABLES)
  {
    if (alter_ctx.is_table_renamed())
      thd->mdl_context.release_all_locks_for_name(mdl_ticket);
    else
      mdl_ticket->downgrade_lock(MDL_SHARED_NO_READ_WRITE);
  }

end_temporary:
  my_snprintf(alter_ctx.tmp_name, sizeof(alter_ctx.tmp_name),
              ER(ER_INSERT_INFO),
	      (ulong) (copied + deleted), (ulong) deleted,
	      (ulong) thd->get_stmt_da()->current_statement_warn_count());
  my_ok(thd, copied + deleted, 0L, alter_ctx.tmp_name);
  DBUG_RETURN(false);

err_new_table_cleanup:
  if (new_table)
  {
    /* close_temporary_table() frees the new_table pointer. */
    close_temporary_table(thd, new_table, true, true);
  }
  else
    (void) quick_rm_table(thd, new_db_type,
                          alter_ctx.new_db, alter_ctx.tmp_name,
                          (FN_IS_TMP | (no_ha_table ? NO_HA_TABLE : 0)));

  /*
    No default value was provided for a DATE/DATETIME field, the
    current sql_mode doesn't allow the '0000-00-00' value and
    the table to be altered isn't empty.
    Report error here.
  */
  if (alter_ctx.error_if_not_empty &&
      thd->get_stmt_da()->current_row_for_warning())
  {
    uint f_length;
    enum enum_mysql_timestamp_type t_type= MYSQL_TIMESTAMP_DATE;
    switch (alter_ctx.datetime_field->sql_type)
    {
      case MYSQL_TYPE_DATE:
      case MYSQL_TYPE_NEWDATE:
        f_length= MAX_DATE_WIDTH; // "0000-00-00";
        t_type= MYSQL_TIMESTAMP_DATE;
        break;
      case MYSQL_TYPE_DATETIME:
      case MYSQL_TYPE_DATETIME2:
        f_length= MAX_DATETIME_WIDTH; // "0000-00-00 00:00:00";
        t_type= MYSQL_TIMESTAMP_DATETIME;
        break;
      default:
        /* Shouldn't get here. */
        f_length= 0;
        DBUG_ASSERT(0);
    }
    bool save_abort_on_warning= thd->abort_on_warning;
    thd->abort_on_warning= true;
    make_truncated_value_warning(thd, Sql_condition::WARN_LEVEL_WARN,
                                 ErrConvString(my_zero_datetime6, f_length),
                                 t_type,
                                 alter_ctx.datetime_field->field_name);
    thd->abort_on_warning= save_abort_on_warning;
  }

  DBUG_RETURN(true);

err_with_mdl:
  /*
    An error happened while we were holding exclusive name metadata lock
    on table being altered. To be safe under LOCK TABLES we should
    remove all references to the altered table from the list of locked
    tables and release the exclusive metadata lock.
  */
  thd->locked_tables_list.unlink_all_closed_tables(thd, NULL, 0);
  thd->mdl_context.release_all_locks_for_name(mdl_ticket);
  DBUG_RETURN(true);
}