in sql/sql_table.cc [4731:5247]
bool create_table_impl(THD *thd,
const char *db, const char *table_name,
const char *error_table_name,
const char *path,
HA_CREATE_INFO *create_info,
Alter_info *alter_info,
bool internal_tmp_table,
uint select_field_count,
bool no_ha_table,
bool ignore_block_create_no_pk_flag,
bool *is_trans,
KEY **key_info,
uint *key_count)
{
const char *alias;
uint db_options;
handler *file;
bool error= TRUE;
DBUG_ENTER("create_table_impl");
DBUG_PRINT("enter", ("db: '%s' table: '%s' tmp: %d",
db, table_name, internal_tmp_table));
/* Check for duplicate fields and check type of table to create */
if (!alter_info->create_list.elements)
{
my_message(ER_TABLE_MUST_HAVE_COLUMNS, ER(ER_TABLE_MUST_HAVE_COLUMNS),
MYF(0));
DBUG_RETURN(TRUE);
}
if (check_engine(thd, db, table_name, create_info))
DBUG_RETURN(TRUE);
set_table_default_charset(thd, create_info, (char*) db);
db_options= create_info->table_options;
if (create_info->row_type == ROW_TYPE_DYNAMIC)
db_options|=HA_OPTION_PACK_RECORD;
alias= table_case_name(create_info, table_name);
if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
create_info->db_type)))
{
mem_alloc_error(sizeof(handler));
DBUG_RETURN(TRUE);
}
bool validate_primary_key_existence =
should_check_table_for_primary_key(
thd, create_info, db, ignore_block_create_no_pk_flag);
#ifdef WITH_PARTITION_STORAGE_ENGINE
partition_info *part_info= thd->work_part_info;
if (!part_info && create_info->db_type->partition_flags &&
(create_info->db_type->partition_flags() & HA_USE_AUTO_PARTITION))
{
/*
Table is not defined as a partitioned table but the engine handles
all tables as partitioned. The handler will set up the partition info
object with the default settings.
*/
thd->work_part_info= part_info= new partition_info();
if (!part_info)
{
mem_alloc_error(sizeof(partition_info));
DBUG_RETURN(TRUE);
}
file->set_auto_partitions(part_info);
part_info->default_engine_type= create_info->db_type;
part_info->is_auto_partitioned= TRUE;
}
if (part_info)
{
/*
The table has been specified as a partitioned table.
If this is part of an ALTER TABLE the handler will be the partition
handler but we need to specify the default handler to use for
partitions also in the call to check_partition_info. We transport
this information in the default_db_type variable, it is either
DB_TYPE_DEFAULT or the engine set in the ALTER TABLE command.
*/
Key *key;
handlerton *part_engine_type= create_info->db_type;
char *part_syntax_buf;
uint syntax_len;
handlerton *engine_type;
List_iterator<partition_element> part_it(part_info->partitions);
partition_element *part_elem;
while ((part_elem= part_it++))
{
if (part_elem->part_comment)
{
size_t comment_len= strlen(part_elem->part_comment);
if (validate_comment_length(thd, part_elem->part_comment,
&comment_len,
TABLE_PARTITION_COMMENT_MAXLEN,
ER_TOO_LONG_TABLE_PARTITION_COMMENT,
part_elem->partition_name))
DBUG_RETURN(true);
part_elem->part_comment[comment_len]= '\0';
}
if (part_elem->subpartitions.elements)
{
List_iterator<partition_element> sub_it(part_elem->subpartitions);
partition_element *subpart_elem;
while ((subpart_elem= sub_it++))
{
if (subpart_elem->part_comment)
{
size_t comment_len= strlen(subpart_elem->part_comment);
if (validate_comment_length(thd, subpart_elem->part_comment,
&comment_len,
TABLE_PARTITION_COMMENT_MAXLEN,
ER_TOO_LONG_TABLE_PARTITION_COMMENT,
subpart_elem->partition_name))
DBUG_RETURN(true);
subpart_elem->part_comment[comment_len]= '\0';
}
}
}
}
if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
{
my_error(ER_PARTITION_NO_TEMPORARY, MYF(0));
goto err;
}
if ((part_engine_type == partition_hton) &&
part_info->default_engine_type)
{
/*
This only happens at ALTER TABLE.
default_engine_type was assigned from the engine set in the ALTER
TABLE command.
*/
;
}
else
{
if (create_info->used_fields & HA_CREATE_USED_ENGINE)
{
part_info->default_engine_type= create_info->db_type;
}
else
{
if (part_info->default_engine_type == NULL)
{
part_info->default_engine_type= ha_checktype(thd,
DB_TYPE_DEFAULT, 0, 0);
}
}
}
DBUG_PRINT("info", ("db_type = %s create_info->db_type = %s",
ha_resolve_storage_engine_name(part_info->default_engine_type),
ha_resolve_storage_engine_name(create_info->db_type)));
if (part_info->check_partition_info(thd, &engine_type, file,
create_info, FALSE))
goto err;
part_info->default_engine_type= engine_type;
/*
We reverse the partitioning parser and generate a standard format
for syntax stored in frm file.
*/
if (!(part_syntax_buf= generate_partition_syntax(part_info,
&syntax_len,
TRUE, TRUE,
create_info,
alter_info,
NULL)))
goto err;
part_info->part_info_string= part_syntax_buf;
part_info->part_info_len= syntax_len;
if ((!(engine_type->partition_flags &&
engine_type->partition_flags() & HA_CAN_PARTITION)) ||
create_info->db_type == partition_hton)
{
/*
The handler assigned to the table cannot handle partitioning.
Assign the partition handler as the handler of the table.
*/
DBUG_PRINT("info", ("db_type: %s",
ha_resolve_storage_engine_name(create_info->db_type)));
delete file;
create_info->db_type= partition_hton;
if (!(file= get_ha_partition(part_info)))
{
DBUG_RETURN(TRUE);
}
/*
If we have default number of partitions or subpartitions we
might require to set-up the part_info object such that it
creates a proper .par file. The current part_info object is
only used to create the frm-file and .par-file.
*/
if (part_info->use_default_num_partitions &&
part_info->num_parts &&
(int)part_info->num_parts !=
file->get_default_no_partitions(create_info))
{
uint i;
List_iterator<partition_element> part_it(part_info->partitions);
part_it++;
DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
for (i= 1; i < part_info->partitions.elements; i++)
(part_it++)->part_state= PART_TO_BE_DROPPED;
}
else if (part_info->is_sub_partitioned() &&
part_info->use_default_num_subpartitions &&
part_info->num_subparts &&
(int)part_info->num_subparts !=
file->get_default_no_partitions(create_info))
{
DBUG_ASSERT(thd->lex->sql_command != SQLCOM_CREATE_TABLE);
part_info->num_subparts= file->get_default_no_partitions(create_info);
}
}
else if (create_info->db_type != engine_type)
{
/*
We come here when we don't use a partitioned handler.
Since we use a partitioned table it must be "native partitioned".
We have switched engine from defaults, most likely only specified
engines in partition clauses.
*/
delete file;
if (!(file= get_new_handler((TABLE_SHARE*) 0, thd->mem_root,
engine_type)))
{
mem_alloc_error(sizeof(handler));
DBUG_RETURN(TRUE);
}
}
/*
Unless table's storage engine supports partitioning natively
don't allow foreign keys on partitioned tables (they won't
work work even with InnoDB beneath of partitioning engine).
If storage engine handles partitioning natively (like NDB)
foreign keys support is possible, so we let the engine decide.
*/
if (create_info->db_type == partition_hton)
{
List_iterator_fast<Key> key_iterator(alter_info->key_list);
while ((key= key_iterator++))
{
if (key->type == Key::FOREIGN_KEY)
{
my_error(ER_FOREIGN_KEY_ON_PARTITIONED, MYF(0));
goto err;
}
}
}
}
#endif
if (mysql_prepare_create_table(thd, db, error_table_name, create_info,
alter_info, internal_tmp_table, &db_options,
file, key_info, key_count, select_field_count,
validate_primary_key_existence))
goto err;
if (create_info->options & HA_LEX_CREATE_TMP_TABLE)
create_info->table_options|=HA_CREATE_DELAY_KEY_WRITE;
/* Check if table already exists */
if ((create_info->options & HA_LEX_CREATE_TMP_TABLE) &&
find_temporary_table(thd, db, table_name))
{
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
{
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
alias);
error= 0;
goto err;
}
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), alias);
goto err;
}
if (!internal_tmp_table && !(create_info->options & HA_LEX_CREATE_TMP_TABLE))
{
char frm_name[FN_REFLEN+1];
strxnmov(frm_name, sizeof(frm_name) - 1, path, reg_ext, NullS);
if (!access(frm_name, F_OK))
{
if (create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS)
goto warn;
my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
goto err;
}
/*
We don't assert here, but check the result, because the table could be
in the table definition cache and in the same time the .frm could be
missing from the disk, in case of manual intervention which deletes
the .frm file. The user has to use FLUSH TABLES; to clear the cache.
Then she could create the table. This case is pretty obscure and
therefore we don't introduce a new error message only for it.
*/
mysql_mutex_lock(&LOCK_open);
if (get_cached_table_share(db, table_name))
{
mysql_mutex_unlock(&LOCK_open);
my_error(ER_TABLE_EXISTS_ERROR, MYF(0), table_name);
goto err;
}
mysql_mutex_unlock(&LOCK_open);
}
/*
Check that table with given name does not already
exist in any storage engine. In such a case it should
be discovered and the error ER_TABLE_EXISTS_ERROR be returned
unless user specified CREATE TABLE IF EXISTS
An exclusive metadata lock ensures that no
one else is attempting to discover the table. Since
it's not on disk as a frm file, no one could be using it!
*/
if (!(create_info->options & HA_LEX_CREATE_TMP_TABLE))
{
bool create_if_not_exists =
create_info->options & HA_LEX_CREATE_IF_NOT_EXISTS;
int retcode = ha_table_exists_in_engine(thd, db, table_name);
DBUG_PRINT("info", ("exists_in_engine: %u",retcode));
switch (retcode)
{
case HA_ERR_NO_SUCH_TABLE:
/* Normal case, no table exists. we can go and create it */
break;
case HA_ERR_TABLE_EXIST:
DBUG_PRINT("info", ("Table existed in handler"));
if (create_if_not_exists)
goto warn;
my_error(ER_TABLE_EXISTS_ERROR,MYF(0),table_name);
goto err;
break;
default:
DBUG_PRINT("info", ("error: %u from storage engine", retcode));
my_error(retcode, MYF(0),table_name);
goto err;
}
}
THD_STAGE_INFO(thd, stage_creating_table);
{
size_t dirlen;
char dirpath[FN_REFLEN];
/*
data_file_name and index_file_name include the table name without
extension. Mostly this does not refer to an existing file. When
comparing data_file_name or index_file_name against the data
directory, we try to resolve all symbolic links. On some systems,
we use realpath(3) for the resolution. This returns ENOENT if the
resolved path does not refer to an existing file. my_realpath()
does then copy the requested path verbatim, without symlink
resolution. Thereafter the comparison can fail even if the
requested path is within the data directory. E.g. if symlinks to
another file system are used. To make realpath(3) return the
resolved path, we strip the table name and compare the directory
path only. If the directory doesn't exist either, table creation
will fail anyway.
*/
if (create_info->data_file_name)
{
dirname_part(dirpath, create_info->data_file_name, &dirlen);
if (test_if_data_home_dir(dirpath))
{
my_error(ER_WRONG_ARGUMENTS, MYF(0), "DATA DIRECTORY");
goto err;
}
}
if (create_info->index_file_name)
{
dirname_part(dirpath, create_info->index_file_name, &dirlen);
if (test_if_data_home_dir(dirpath))
{
my_error(ER_WRONG_ARGUMENTS, MYF(0), "INDEX DIRECTORY");
goto err;
}
}
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (check_partition_dirs(thd->lex->part_info))
{
goto err;
}
#endif /* WITH_PARTITION_STORAGE_ENGINE */
if (thd->variables.sql_mode & MODE_NO_DIR_IN_CREATE)
{
if (create_info->data_file_name)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
"DATA DIRECTORY");
if (create_info->index_file_name)
push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
WARN_OPTION_IGNORED, ER(WARN_OPTION_IGNORED),
"INDEX DIRECTORY");
create_info->data_file_name= create_info->index_file_name= 0;
}
create_info->table_options=db_options;
/*
DOCUMENT type is only supported by InnoDB and it is only
allowed if sys var allow_document_type is true.
*/
if ((create_info->db_type->db_type != DB_TYPE_INNODB &&
create_info->db_type->db_type != DB_TYPE_PARTITION_DB) ||
!allow_document_type)
{
Create_field *field = NULL;
List_iterator<Create_field> it(alter_info->create_list);
while ((field=it++))
{
if (f_is_document(field->pack_flag))
{
if (!allow_document_type)
{
my_error(ER_DOCUMENT_FIELD_NOT_ALLOWED, MYF(0));
}
else
{
my_error(ER_DOCUMENT_FIELD_IN_NON_INNODB_TABLE, MYF(0));
}
goto err;
}
}
}
/*
Create .FRM (and .PAR file for partitioned table).
If "no_ha_table" is false also create table in storage engine.
*/
if (rea_create_table(thd, path, db, table_name,
create_info, alter_info->create_list,
*key_count, *key_info, file, no_ha_table))
goto err;
if (!no_ha_table && create_info->options & HA_LEX_CREATE_TMP_TABLE)
{
/*
Open a table (skipping table cache) and add it into
THD::temporary_tables list.
*/
TABLE *table= open_table_uncached(thd, path, db, table_name, true, true);
if (!table)
{
(void) rm_temporary_table(create_info->db_type, path);
goto err;
}
if (is_trans != NULL)
*is_trans= table->file->has_transactions();
thd->thread_specific_used= TRUE;
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
else if (part_info && no_ha_table)
{
/*
For partitioned tables we can't find some problems with table
until table is opened. Therefore in order to disallow creation
of corrupted tables we have to try to open table as the part
of its creation process.
In cases when both .FRM and SE part of table are created table
is implicitly open in ha_create_table() call.
In cases when we create .FRM without SE part we have to open
table explicitly.
*/
TABLE table;
TABLE_SHARE share;
init_tmp_table_share(thd, &share, db, 0, table_name, path);
bool result= (open_table_def(thd, &share, 0) ||
open_table_from_share(thd, &share, "", 0, (uint) READ_ALL,
0, &table, true));
/*
Assert that the change list is empty as no partition function currently
needs to modify item tree. May need call THD::rollback_item_tree_changes
later before calling closefrm if the change list is not empty.
*/
DBUG_ASSERT(thd->change_list.is_empty());
if (!result)
(void) closefrm(&table, 0);
free_table_share(&share);
if (result)
{
char frm_name[FN_REFLEN + 1];
strxnmov(frm_name, sizeof(frm_name) - 1, path, reg_ext, NullS);
(void) mysql_file_delete(key_file_frm, frm_name, MYF(0));
(void) file->ha_create_handler_files(path, NULL, CHF_DELETE_FLAG,
create_info);
goto err;
}
}
#endif
error= FALSE;
err:
THD_STAGE_INFO(thd, stage_after_create);
delete file;
DBUG_RETURN(error);
warn:
error= FALSE;
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
alias);
goto err;
}