in sql/sql_insert.cc [609:1209]
bool mysql_insert(THD *thd,TABLE_LIST *table_list,
List<Item> &fields,
List<List_item> &values_list,
List<Item> &update_fields,
List<Item> &update_values,
enum_duplicates duplic,
bool ignore)
{
int error, res;
bool err= true;
bool transactional_table, joins_freed= FALSE;
bool changed;
bool was_insert_delayed= (table_list->lock_type == TL_WRITE_DELAYED);
bool is_locked= false;
ulong counter = 1;
ulonglong id;
/*
We have three alternative syntax rules for the INSERT statement:
1) "INSERT (columns) VALUES ...", so non-listed columns need a default
2) "INSERT VALUES (), ..." so all columns need a default;
note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
emptiness of the first row is enough
3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
expr_i is "DEFAULT" (in which case the column is set by
Item_default_value::save_in_field()).
*/
const bool manage_defaults=
fields.elements != 0 || // 1)
values_list.head()->elements == 0; // 2)
COPY_INFO info(COPY_INFO::INSERT_OPERATION,
&fields,
manage_defaults,
duplic,
ignore);
COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &update_fields, &update_values);
Name_resolution_context *context;
Name_resolution_context_state ctx_state;
#ifndef EMBEDDED_LIBRARY
char *query= thd->query();
/*
log_on is about delayed inserts only.
By default, both logs are enabled (this won't cause problems if the server
runs without --log-bin).
*/
bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
#endif
Item *unused_conds= 0;
#ifdef WITH_PARTITION_STORAGE_ENGINE
uint num_partitions= 0;
enum partition_info::enum_can_prune can_prune_partitions=
partition_info::PRUNE_NO;
MY_BITMAP used_partitions;
bool prune_needs_default_values;
#endif /* WITH_PARITITION_STORAGE_ENGINE */
DBUG_ENTER("mysql_insert");
/*
Upgrade lock type if the requested lock is incompatible with
the current connection mode or table operation.
*/
upgrade_lock_type(thd, &table_list->lock_type, duplic);
/*
We can't write-delayed into a table locked with LOCK TABLES:
this will lead to a deadlock, since the delayed thread will
never be able to get a lock on the table. QQQ: why not
upgrade the lock here instead?
*/
if (table_list->lock_type == TL_WRITE_DELAYED &&
thd->locked_tables_mode &&
find_locked_table(thd->open_tables, table_list->db,
table_list->table_name))
{
my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
table_list->table_name);
DBUG_RETURN(TRUE);
}
if (table_list->lock_type == TL_WRITE_DELAYED)
{
if (open_and_lock_for_insert_delayed(thd, table_list))
DBUG_RETURN(TRUE);
is_locked= true;
}
else
{
if (open_normal_and_derived_tables(thd, table_list, 0))
DBUG_RETURN(true);
}
const thr_lock_type lock_type= table_list->lock_type;
THD_STAGE_INFO(thd, stage_init);
thd->lex->used_tables=0;
List_iterator_fast<List_item> its(values_list);
List_item *values= its++;
const uint value_count= values->elements;
TABLE *table= NULL;
if (mysql_prepare_insert(thd, table_list, table, fields, values,
update_fields, update_values, duplic, &unused_conds,
FALSE,
(fields.elements || !value_count ||
table_list->view != 0),
!ignore && thd->is_strict_mode()))
goto exit_without_my_ok;
/* mysql_prepare_insert set table_list->table if it was not set */
table= table_list->table;
/* Must be done before can_prune_insert, due to internal initialization. */
if (info.add_function_default_columns(table, table->write_set))
goto exit_without_my_ok;
if (duplic == DUP_UPDATE &&
update.add_function_default_columns(table, table->write_set))
goto exit_without_my_ok;
context= &thd->lex->select_lex.context;
/*
These three asserts test the hypothesis that the resetting of the name
resolution context below is not necessary at all since the list of local
tables for INSERT always consists of one table.
*/
DBUG_ASSERT(!table_list->next_local);
DBUG_ASSERT(!context->table_list->next_local);
DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
/* Save the state of the current name resolution context. */
ctx_state.save_state(context, table_list);
/*
Perform name resolution only in the first table - 'table_list',
which is the table that is inserted into.
*/
table_list->next_local= 0;
context->resolve_in_table_list_only(table_list);
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (!is_locked && table->part_info)
{
if (table->part_info->can_prune_insert(thd,
duplic,
update,
update_fields,
fields,
!MY_TEST(values->elements),
&can_prune_partitions,
&prune_needs_default_values,
&used_partitions))
goto exit_without_my_ok;
if (can_prune_partitions != partition_info::PRUNE_NO)
{
num_partitions= table->part_info->lock_partitions.n_bits;
/*
Pruning probably possible, all partitions is unmarked for read/lock,
and we must now add them on row by row basis.
Check the first INSERT value.
Do not fail here, since that would break MyISAM behavior of inserting
all rows before the failing row.
PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
values, so we only need to check the first INSERT value, since all the
rest will be in the same partition.
*/
if (table->part_info->set_used_partition(fields,
*values,
info,
prune_needs_default_values,
&used_partitions))
can_prune_partitions= partition_info::PRUNE_NO;
}
}
#endif /* WITH_PARTITION_STORAGE_ENGINE */
while ((values= its++))
{
counter++;
if (values->elements != value_count)
{
my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
goto exit_without_my_ok;
}
if (setup_fields(thd, Ref_ptr_array(), *values, MARK_COLUMNS_READ, 0, 0))
goto exit_without_my_ok;
#ifdef WITH_PARTITION_STORAGE_ENGINE
/*
To make it possible to increase concurrency on table level locking
engines such as MyISAM, we check pruning for each row until we will use
all partitions, Even if the number of rows is much higher than the
number of partitions.
TODO: Cache the calculated part_id and reuse in
ha_partition::write_row() if possible.
*/
if (can_prune_partitions == partition_info::PRUNE_YES)
{
if (table->part_info->set_used_partition(fields,
*values,
info,
prune_needs_default_values,
&used_partitions))
can_prune_partitions= partition_info::PRUNE_NO;
if (!(counter % num_partitions))
{
/*
Check if we using all partitions in table after adding partition
for current row to the set of used partitions. Do it only from
time to time to avoid overhead from bitmap_is_set_all() call.
*/
if (bitmap_is_set_all(&used_partitions))
can_prune_partitions= partition_info::PRUNE_NO;
}
}
#endif /* WITH_PARTITION_STORAGE_ENGINE */
}
table->auto_increment_field_not_null= false;
its.rewind ();
/* Restore the current context. */
ctx_state.restore_state(context, table_list);
if (thd->lex->describe)
{
/*
Send "No tables used" and stop execution here since
there is no SELECT to explain.
*/
err= explain_no_table(thd, "No tables used");
goto exit_without_my_ok;
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (can_prune_partitions != partition_info::PRUNE_NO)
{
/*
Only lock the partitions we will insert into.
And also only read from those partitions (duplicates etc.).
If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
the new set of read/lock partitions is the intersection of read/lock
partitions and used partitions, i.e only the partitions that exists in
both sets will be marked for read/lock.
It is also safe for REPLACE, since all potentially conflicting records
always belong to the same partition as the one which we try to
insert a row. This is because ALL unique/primary keys must
include ALL partitioning columns.
*/
bitmap_intersect(&table->part_info->read_partitions,
&used_partitions);
bitmap_intersect(&table->part_info->lock_partitions,
&used_partitions);
}
#endif /* WITH_PARTITION_STORAGE_ENGINE */
/* Lock the tables now if not delayed/already locked. */
if (!is_locked &&
lock_tables(thd, table_list, thd->lex->table_count, 0))
DBUG_RETURN(true);
/*
Count warnings for all inserts.
For single line insert, generate an error if try to set a NOT NULL field
to NULL.
*/
thd->count_cuted_fields= ((values_list.elements == 1 &&
!ignore) ?
CHECK_FIELD_ERROR_FOR_NULL :
CHECK_FIELD_WARN);
thd->cuted_fields = 0L;
table->next_number_field=table->found_next_number_field;
#ifdef HAVE_REPLICATION
if (thd->slave_thread)
{
DBUG_ASSERT(active_mi != NULL);
if(info.get_duplicate_handling() == DUP_UPDATE &&
table->next_number_field != NULL &&
rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
goto exit_without_my_ok;
}
#endif
error=0;
THD_STAGE_INFO(thd, stage_update);
if (duplic == DUP_REPLACE &&
(!table->triggers || !table->triggers->has_delete_triggers()))
table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
if (duplic == DUP_UPDATE)
table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
/*
let's *try* to start bulk inserts. It won't necessary
start them as values_list.elements should be greater than
some - handler dependent - threshold.
We should not start bulk inserts if this statement uses
functions or invokes triggers since they may access
to the same table and therefore should not see its
inconsistent state created by this optimization.
So we call start_bulk_insert to perform nesessary checks on
values_list.elements, and - if nothing else - to initialize
the code to make the call of end_bulk_insert() below safe.
*/
#ifndef EMBEDDED_LIBRARY
if (lock_type != TL_WRITE_DELAYED)
#endif /* EMBEDDED_LIBRARY */
{
if (duplic != DUP_ERROR || ignore)
table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
/**
This is a simple check for the case when the table has a trigger
that reads from it, or when the statement invokes a stored function
that reads from the table being inserted to.
Engines can't handle a bulk insert in parallel with a read form the
same table in the same connection.
*/
if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
table->file->ha_start_bulk_insert(values_list.elements);
}
thd->abort_on_warning= (!ignore && thd->is_strict_mode());
table->prepare_triggers_for_insert_stmt_or_event();
table->mark_columns_needed_for_insert();
if (table_list->prepare_where(thd, 0, TRUE) ||
table_list->prepare_check_option(thd))
error= 1;
while ((values= its++))
{
if (fields.elements || !value_count)
{
restore_record(table,s->default_values); // Get empty record
/*
Check whether default values of the fields not specified in column list
are correct or not.
*/
if (validate_default_values_of_unset_fields(thd, table))
{
error= 1;
break;
}
if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
table->triggers,
TRG_EVENT_INSERT))
{
if (values_list.elements != 1 && ! thd->is_error())
{
info.stats.records++;
continue;
}
/*
TODO: set thd->abort_on_warning if values_list.elements == 1
and check that all items return warning in case of problem with
storing field.
*/
error=1;
break;
}
}
else
{
if (thd->lex->used_tables) // Column used in values()
restore_record(table,s->default_values); // Get empty record
else
{
TABLE_SHARE *share= table->s;
/*
Fix delete marker. No need to restore rest of record since it will
be overwritten by fill_record() anyway (and fill_record() does not
use default values in this case).
*/
table->record[0][0]= share->default_values[0];
/* Fix undefined null_bits. */
if (share->null_bytes > 1 && share->last_null_bit_pos)
{
table->record[0][share->null_bytes - 1]=
share->default_values[share->null_bytes - 1];
}
}
if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
table->triggers,
TRG_EVENT_INSERT))
{
if (values_list.elements != 1 && ! thd->is_error())
{
info.stats.records++;
continue;
}
error=1;
break;
}
}
if ((res= table_list->view_check_option(thd,
(values_list.elements == 1 ?
0 :
ignore))) ==
VIEW_CHECK_SKIP)
continue;
else if (res == VIEW_CHECK_ERROR)
{
error= 1;
break;
}
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
LEX_STRING const st_query = { query, thd->query_length() };
DEBUG_SYNC(thd, "before_write_delayed");
error= write_delayed(thd, table, st_query, log_on, &info);
DEBUG_SYNC(thd, "after_write_delayed");
query=0;
}
else
#endif
error= write_record(thd, table, &info, &update);
if (error)
break;
thd->get_stmt_da()->inc_current_row_for_warning();
}
free_underlaid_joins(thd, &thd->lex->select_lex);
joins_freed= TRUE;
/*
Now all rows are inserted. Time to update logs and sends response to
user
*/
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
{
if (!error)
{
info.stats.copied=values_list.elements;
end_delayed_insert(thd);
}
}
else
#endif
{
/*
Do not do this release if this is a delayed insert, it would steal
auto_inc values from the delayed_insert thread as they share TABLE.
*/
table->file->ha_release_auto_increment();
if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
table->file->ha_end_bulk_insert() && !error)
{
table->file->print_error(my_errno,MYF(0));
error=1;
}
if (duplic != DUP_ERROR || ignore)
table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
transactional_table= table->file->has_transactions();
if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
{
/*
Invalidate the table in the query cache if something changed.
For the transactional algorithm to work the invalidation must be
before binlog writing and ha_autocommit_or_rollback
*/
query_cache_invalidate3(thd, table_list, 1);
}
if (error <= 0 ||
thd->transaction.stmt.cannot_safely_rollback() ||
was_insert_delayed)
{
if (mysql_bin_log.is_open())
{
int errcode= 0;
if (error <= 0)
{
/*
[Guilhem wrote] Temporary errors may have filled
thd->get_net()->last_error/errno. For example if there has
been a disk full error when writing the row, and it was
MyISAM, then thd->get_net()-.last_error/errno will be set to
"disk full"... and the mysql_file_pwrite() will wait until free
space appears, and so when it finishes then the
write_row() was entirely successful
*/
/* todo: consider removing */
thd->clear_error();
}
else
errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
/* bug#22725:
A query which per-row-loop can not be interrupted with
KILLED, like INSERT, and that does not invoke stored
routines can be binlogged with neglecting the KILLED error.
If there was no error (error == zero) until after the end of
inserting loop the KILLED flag that appeared later can be
disregarded since previously possible invocation of stored
routines did not result in any error due to the KILLED. In
such case the flag is ignored for constructing binlog event.
*/
DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
if (was_insert_delayed && table_list->lock_type == TL_WRITE)
{
/* Binlog INSERT DELAYED as INSERT without DELAYED. */
String log_query;
if (create_insert_stmt_from_insert_delayed(thd, &log_query))
{
sql_print_error("Event Error: An error occurred while creating query string"
"for INSERT DELAYED stmt, before writing it into binary log.");
error= 1;
}
else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
log_query.c_ptr(), log_query.length(),
transactional_table, FALSE, FALSE,
errcode))
error= 1;
}
else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
thd->query(), thd->query_length(),
transactional_table, FALSE, FALSE,
errcode))
error= 1;
}
}
DBUG_ASSERT(transactional_table || !changed ||
thd->transaction.stmt.cannot_safely_rollback());
}
THD_STAGE_INFO(thd, stage_end);
/*
We'll report to the client this id:
- if the table contains an autoincrement column and we successfully
inserted an autogenerated value, the autogenerated value.
- if the table contains no autoincrement column and LAST_INSERT_ID(X) was
called, X.
- if the table contains an autoincrement column, and some rows were
inserted, the id of the last "inserted" row (if IGNORE, that value may not
have been really inserted but ignored).
*/
id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
thd->first_successful_insert_id_in_cur_stmt :
(thd->arg_of_last_insert_id_function ?
thd->first_successful_insert_id_in_prev_stmt :
((table->next_number_field && info.stats.copied) ?
table->next_number_field->val_int() : 0));
table->next_number_field=0;
thd->count_cuted_fields= CHECK_FIELD_IGNORE;
table->auto_increment_field_not_null= FALSE;
if (duplic == DUP_REPLACE &&
(!table->triggers || !table->triggers->has_delete_triggers()))
table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
if (error)
goto exit_without_my_ok;
if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
!thd->cuted_fields))
{
my_ok(thd, info.stats.copied + info.stats.deleted +
((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
info.stats.touched : info.stats.updated),
id);
}
else
{
char buff[160];
ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
info.stats.touched : info.stats.updated);
if (ignore)
my_snprintf(buff, sizeof(buff),
ER(ER_INSERT_INFO), (long) info.stats.records,
(lock_type == TL_WRITE_DELAYED) ? (long) 0 :
(long) (info.stats.records - info.stats.copied),
(long) thd->get_stmt_da()->current_statement_warn_count());
else
my_snprintf(buff, sizeof(buff),
ER(ER_INSERT_INFO), (long) info.stats.records,
(long) (info.stats.deleted + updated),
(long) thd->get_stmt_da()->current_statement_warn_count());
my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
}
thd->abort_on_warning= 0;
DBUG_RETURN(FALSE);
exit_without_my_ok:
#ifndef EMBEDDED_LIBRARY
if (lock_type == TL_WRITE_DELAYED)
end_delayed_insert(thd);
#endif
if (!joins_freed)
free_underlaid_joins(thd, &thd->lex->select_lex);
thd->abort_on_warning= 0;
DBUG_RETURN(err);
}