bool create_table_impl()

in sql/sql_table.cc [4731:5247]


bool create_table_impl(THD *thd,
                       const char *db, const char *table_name,
                       const char *error_table_name,
                       const char *path,
                       HA_CREATE_INFO *create_info,
                       Alter_info *alter_info,
                       bool internal_tmp_table,
                       uint select_field_count,
                       bool no_ha_table,
                       bool ignore_block_create_no_pk_flag,
                       bool *is_trans,
                       KEY **key_info,
                       uint *key_count)
{
  const char	*alias;
  uint		db_options;
  handler	*file;
  bool		error= TRUE;
  DBUG_ENTER("create_table_impl");
  DBUG_PRINT("enter", ("db: '%s'  table: '%s'  tmp: %d",
                       db, table_name, internal_tmp_table));


  /* Check for duplicate fields and check type of table to create */
  if (!alter_info->create_list.elements)
  {
    my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
               MYF(0));
    DBUG_RETURN(TRUE);
  }
  if (check_engine(thd, db, table_name, create_info))
    DBUG_RETURN(TRUE);

  set_table_default_charset(thd, create_info, (char*) db);

  db_options= create_info->table_options;
  if (create_info->row_type == ROW_TYPE_DYNAMIC)
    db_options|=HA_OPTION_PACK_RECORD;
  alias= table_case_name(create_info, table_name);
  if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
                              create_info->db_type)))
  {
    mem_alloc_error(sizeof(handler));
    DBUG_RETURN(TRUE);
  }
  bool validate_primary_key_existence =
      should_check_table_for_primary_key(
        thd, create_info, db, ignore_block_create_no_pk_flag);
#ifdef WITH_PARTITION_STORAGE_ENGINE
  partition_info *part_info= thd->work_part_info;

  if (!part_info && create_info->db_type->partition_flags &&
      (create_info->db_type->partition_flags() & HA_USE_AUTO_PARTITION))
  {
    /*
      Table is not defined as a partitioned table but the engine handles
      all tables as partitioned. The handler will set up the partition info
      object with the default settings.
    */
    thd->work_part_info= part_info= new partition_info();
    if (!part_info)
    {
      mem_alloc_error(sizeof(partition_info));
      DBUG_RETURN(TRUE);
    }
    file->set_auto_partitions(part_info);
    part_info->default_engine_type= create_info->db_type;
    part_info->is_auto_partitioned= TRUE;
  }
  if (part_info)
  {
    /*
      The table has been specified as a partitioned table.
      If this is part of an ALTER TABLE the handler will be the partition
      handler but we need to specify the default handler to use for
      partitions also in the call to check_partition_info. We transport
      this information in the default_db_type variable, it is either
      DB_TYPE_DEFAULT or the engine set in the ALTER TABLE command.
    */
    Key *key;
    handlerton *part_engine_type= create_info->db_type;
    char *part_syntax_buf;
    uint syntax_len;
    handlerton *engine_type;
    List_iterator<partition_element> part_it(part_info->partitions);
    partition_element *part_elem;

    while ((part_elem= part_it++))
    {
      if (part_elem->part_comment)
      {
        size_t comment_len= strlen(part_elem->part_comment);
        if (validate_comment_length(thd, part_elem->part_comment,
                                     &comment_len,
                                     TABLE_PARTITION_COMMENT_MAXLEN,
                                     ER_TOO_LONG_TABLE_PARTITION_COMMENT,
                                     part_elem->partition_name))
          DBUG_RETURN(true);
        part_elem->part_comment[comment_len]= '\0';
      }
      if (part_elem->subpartitions.elements)
      {
        List_iterator<partition_element> sub_it(part_elem->subpartitions);
        partition_element *subpart_elem;
        while ((subpart_elem= sub_it++))
        {
          if (subpart_elem->part_comment)
          {
            size_t comment_len= strlen(subpart_elem->part_comment);
            if (validate_comment_length(thd, subpart_elem->part_comment,
                                         &comment_len,
                                         TABLE_PARTITION_COMMENT_MAXLEN,
                                         ER_TOO_LONG_TABLE_PARTITION_COMMENT,
                                         subpart_elem->partition_name))
              DBUG_RETURN(true);
            subpart_elem->part_comment[comment_len]= '\0';
          }
        }
      }
    } 
    if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
    {
      my_error(ER_PARTITION_NO_TEMPORARY, MYF(0));
      goto err;
    }
    if ((part_engine_type == partition_hton) &&
        part_info->default_engine_type)
    {
      /*
        This only happens at ALTER TABLE.
        default_engine_type was assigned from the engine set in the ALTER
        TABLE command.
      */
      ;
    }
    else
    {
      if (create_info->used_fields & HA_CREATE_USED_ENGINE)
      {
        part_info->default_engine_type= create_info->db_type;
      }
      else
      {
        if (part_info->default_engine_type == NULL)
        {
          part_info->default_engine_type= ha_checktype(thd,
                                          DB_TYPE_DEFAULT, 0, 0);
        }
      }
    }
    DBUG_PRINT("info", ("db_type = %s create_info->db_type = %s",
             ha_resolve_storage_engine_name(part_info->default_engine_type),
             ha_resolve_storage_engine_name(create_info->db_type)));
    if (part_info->check_partition_info(thd, &engine_type, file,
                                        create_info, FALSE))
      goto err;
    part_info->default_engine_type= engine_type;

    /*
      We reverse the partitioning parser and generate a standard format
      for syntax stored in frm file.
    */
    if (!(part_syntax_buf= generate_partition_syntax(part_info,
                                                     &syntax_len,
                                                     TRUE, TRUE,
                                                     create_info,
                                                     alter_info,
                                                     NULL)))
      goto err;
    part_info->part_info_string= part_syntax_buf;
    part_info->part_info_len= syntax_len;
    if ((!(engine_type->partition_flags &&
           engine_type->partition_flags() & HA_CAN_PARTITION)) ||
        create_info->db_type == partition_hton)
    {
      /*
        The handler assigned to the table cannot handle partitioning.
        Assign the partition handler as the handler of the table.
      */
      DBUG_PRINT("info", ("db_type: %s",
                        ha_resolve_storage_engine_name(create_info->db_type)));
      delete file;
      create_info->db_type= partition_hton;
      if (!(file= get_ha_partition(part_info)))
      {
        DBUG_RETURN(TRUE);
      }
      /*
        If we have default number of partitions or subpartitions we
        might require to set-up the part_info object such that it
        creates a proper .par file. The current part_info object is
        only used to create the frm-file and .par-file.
      */
      if (part_info->use_default_num_partitions &&
          part_info->num_parts &&
          (int)part_info->num_parts !=
          file->get_default_no_partitions(create_info))
      {
        uint i;
        List_iterator<partition_element> part_it(part_info->partitions);
        part_it++;
        DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
        for (i= 1; i < part_info->partitions.elements; i++)
          (part_it++)->part_state= PART_TO_BE_DROPPED;
      }
      else if (part_info->is_sub_partitioned() &&
               part_info->use_default_num_subpartitions &&
               part_info->num_subparts &&
               (int)part_info->num_subparts !=
                 file->get_default_no_partitions(create_info))
      {
        DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
        part_info->num_subparts= file->get_default_no_partitions(create_info);
      }
    }
    else if (create_info->db_type != engine_type)
    {
      /*
        We come here when we don't use a partitioned handler.
        Since we use a partitioned table it must be "native partitioned".
        We have switched engine from defaults, most likely only specified
        engines in partition clauses.
      */
      delete file;
      if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
                                  engine_type)))
      {
        mem_alloc_error(sizeof(handler));
        DBUG_RETURN(TRUE);
      }
    }
    /*
      Unless table's storage engine supports partitioning natively
      don't allow foreign keys on partitioned tables (they won't
      work work even with InnoDB beneath of partitioning engine).
      If storage engine handles partitioning natively (like NDB)
      foreign keys support is possible, so we let the engine decide.
    */
    if (create_info->db_type == partition_hton)
    {
      List_iterator_fast<Key> key_iterator(alter_info->key_list);
      while ((key= key_iterator++))
      {
        if (key->type == Key::FOREIGN_KEY)
        {
          my_error(ER_FOREIGN_KEY_ON_PARTITIONED, MYF(0));
          goto err;
        }
      }
    }
  }
#endif
  if (mysql_prepare_create_table(thd, db, error_table_name, create_info,
                                 alter_info, internal_tmp_table, &db_options,
                                 file, key_info, key_count, select_field_count,
                                 validate_primary_key_existence))
    goto err;

  if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
    create_info->table_options|=HA_CREATE_DELAY_KEY_WRITE;

  /* Check if table already exists */
  if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
      find_temporary_table(thd, db, table_name))
  {
    if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
    {
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
                          ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                          alias);
      error= 0;
      goto err;
    }
    my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias);
    goto err;
  }

  if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
  {
    char frm_name[FN_REFLEN+1];
    strxnmov(frm_name, sizeof(frm_name) - 1, path, reg_ext, NullS);

    if (!access(frm_name, F_OK))
    {
      if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
        goto warn;
      my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
      goto err;
    }
    /*
      We don't assert here, but check the result, because the table could be
      in the table definition cache and in the same time the .frm could be
      missing from the disk, in case of manual intervention which deletes
      the .frm file. The user has to use FLUSH TABLES; to clear the cache.
      Then she could create the table. This case is pretty obscure and
      therefore we don't introduce a new error message only for it.
    */
    mysql_mutex_lock(&LOCK_open);
    if (get_cached_table_share(db, table_name))
    {
      mysql_mutex_unlock(&LOCK_open);
      my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
      goto err;
    }
    mysql_mutex_unlock(&LOCK_open);
  }

  /*
    Check that table with given name does not already
    exist in any storage engine. In such a case it should
    be discovered and the error ER_TABLE_EXISTS_ERROR be returned
    unless user specified CREATE TABLE IF EXISTS
    An exclusive metadata lock ensures that no
    one else is attempting to discover the table. Since
    it's not on disk as a frm file, no one could be using it!
  */
  if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
  {
    bool create_if_not_exists =
      create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
    int retcode = ha_table_exists_in_engine(thd, db, table_name);
    DBUG_PRINT("info", ("exists_in_engine: %u",retcode));
    switch (retcode)
    {
      case HA_ERR_NO_SUCH_TABLE:
        /* Normal case, no table exists. we can go and create it */
        break;
      case HA_ERR_TABLE_EXIST:
        DBUG_PRINT("info", ("Table existed in handler"));

        if (create_if_not_exists)
          goto warn;
        my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
        goto err;
        break;
      default:
        DBUG_PRINT("info", ("error: %u from storage engine", retcode));
        my_error(retcode, MYF(0),table_name);
        goto err;
    }
  }

  THD_STAGE_INFO(thd, stage_creating_table);

  {
    size_t dirlen;
    char   dirpath[FN_REFLEN];

    /*
      data_file_name and index_file_name include the table name without
      extension. Mostly this does not refer to an existing file. When
      comparing data_file_name or index_file_name against the data
      directory, we try to resolve all symbolic links. On some systems,
      we use realpath(3) for the resolution. This returns ENOENT if the
      resolved path does not refer to an existing file. my_realpath()
      does then copy the requested path verbatim, without symlink
      resolution. Thereafter the comparison can fail even if the
      requested path is within the data directory. E.g. if symlinks to
      another file system are used. To make realpath(3) return the
      resolved path, we strip the table name and compare the directory
      path only. If the directory doesn't exist either, table creation
      will fail anyway.
    */
    if (create_info->data_file_name)
    {
      dirname_part(dirpath, create_info->data_file_name, &dirlen);
      if (test_if_data_home_dir(dirpath))
      {
        my_error(ER_WRONG_ARGUMENTS, MYF(0), "DATA DIRECTORY");
        goto err;
      }
    }
    if (create_info->index_file_name)
    {
      dirname_part(dirpath, create_info->index_file_name, &dirlen);
      if (test_if_data_home_dir(dirpath))
      {
        my_error(ER_WRONG_ARGUMENTS, MYF(0), "INDEX DIRECTORY");
        goto err;
      }
    }
  }

#ifdef WITH_PARTITION_STORAGE_ENGINE
  if (check_partition_dirs(thd->lex->part_info))
  {
    goto err;
  }
#endif /* WITH_PARTITION_STORAGE_ENGINE */

  if (thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE)
  {
    if (create_info->data_file_name)
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                          WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
                          "DATA DIRECTORY");
    if (create_info->index_file_name)
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                          WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
                          "INDEX DIRECTORY");
    create_info->data_file_name= create_info->index_file_name= 0;
  }
  create_info->table_options=db_options;

  /*
    DOCUMENT type is only supported by InnoDB and it is only
    allowed if sys var allow_document_type is true.
  */
  if ((create_info->db_type->db_type != DB_TYPE_INNODB &&
       create_info->db_type->db_type != DB_TYPE_PARTITION_DB) ||
      !allow_document_type)
  {
    Create_field *field = NULL;
    List_iterator<Create_field> it(alter_info->create_list);
    while ((field=it++))
    {
      if (f_is_document(field->pack_flag))
      {
        if (!allow_document_type)
        {
          my_error(ER_DOCUMENT_FIELD_NOT_ALLOWED, MYF(0));
        }
        else
        {
          my_error(ER_DOCUMENT_FIELD_IN_NON_INNODB_TABLE, MYF(0));
        }
        goto err;
      }
    }
  }

  /*
    Create .FRM (and .PAR file for partitioned table).
    If "no_ha_table" is false also create table in storage engine.
  */
  if (rea_create_table(thd, path, db, table_name,
                       create_info, alter_info->create_list,
                       *key_count, *key_info, file, no_ha_table))
    goto err;

  if (!no_ha_table && create_info->options & HA_LEX_CREATE_TMP_TABLE)
  {
    /*
      Open a table (skipping table cache) and add it into
      THD::temporary_tables list.
    */

    TABLE *table= open_table_uncached(thd, path, db, table_name, true, true);

    if (!table)
    {
      (void) rm_temporary_table(create_info->db_type, path);
      goto err;
    }

    if (is_trans != NULL)
      *is_trans= table->file->has_transactions();

    thd->thread_specific_used= TRUE;
  }
#ifdef WITH_PARTITION_STORAGE_ENGINE
  else if (part_info && no_ha_table)
  {
    /*
      For partitioned tables we can't find some problems with table
      until table is opened. Therefore in order to disallow creation
      of corrupted tables we have to try to open table as the part
      of its creation process.
      In cases when both .FRM and SE part of table are created table
      is implicitly open in ha_create_table() call.
      In cases when we create .FRM without SE part we have to open
      table explicitly.
    */
    TABLE table;
    TABLE_SHARE share;

    init_tmp_table_share(thd, &share, db, 0, table_name, path);

    bool result= (open_table_def(thd, &share, 0) ||
                  open_table_from_share(thd, &share, "", 0, (uint) READ_ALL,
                                        0, &table, true));
    /*
      Assert that the change list is empty as no partition function currently
      needs to modify item tree. May need call THD::rollback_item_tree_changes
      later before calling closefrm if the change list is not empty.
    */
    DBUG_ASSERT(thd->change_list.is_empty());
    if (!result)
      (void) closefrm(&table, 0);

    free_table_share(&share);

    if (result)
    {
      char frm_name[FN_REFLEN + 1];
      strxnmov(frm_name, sizeof(frm_name) - 1, path, reg_ext, NullS);
      (void) mysql_file_delete(key_file_frm, frm_name, MYF(0));
      (void) file->ha_create_handler_files(path, NULL, CHF_DELETE_FLAG,
                                           create_info);
      goto err;
    }
  }
#endif

  error= FALSE;
err:
  THD_STAGE_INFO(thd, stage_after_create);
  delete file;
  DBUG_RETURN(error);

warn:
  error= FALSE;
  push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
                      ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
                      alias);
  goto err;
}