bool mysql_insert()

in sql/sql_insert.cc [609:1209]


bool mysql_insert(THD *thd,TABLE_LIST *table_list,
                  List<Item> &fields,
                  List<List_item> &values_list,
                  List<Item> &update_fields,
                  List<Item> &update_values,
                  enum_duplicates duplic,
		  bool ignore)
{
  int error, res;
  bool err= true;
  bool transactional_table, joins_freed= FALSE;
  bool changed;
  bool was_insert_delayed= (table_list->lock_type ==  TL_WRITE_DELAYED);
  bool is_locked= false;
  ulong counter = 1;
  ulonglong id;
  /*
    We have three alternative syntax rules for the INSERT statement:
    1) "INSERT (columns) VALUES ...", so non-listed columns need a default
    2) "INSERT VALUES (), ..." so all columns need a default;
    note that "VALUES (),(expr_1, ..., expr_n)" is not allowed, so checking
    emptiness of the first row is enough
    3) "INSERT VALUES (expr_1, ...), ..." so no defaults are needed; even if
    expr_i is "DEFAULT" (in which case the column is set by
    Item_default_value::save_in_field()).
  */
  const bool manage_defaults=
    fields.elements != 0 ||                     // 1)
    values_list.head()->elements == 0;          // 2)
  COPY_INFO info(COPY_INFO::INSERT_OPERATION,
                 &fields,
                 manage_defaults,
                 duplic,
                 ignore);
  COPY_INFO update(COPY_INFO::UPDATE_OPERATION, &update_fields, &update_values);
  Name_resolution_context *context;
  Name_resolution_context_state ctx_state;
#ifndef EMBEDDED_LIBRARY
  char *query= thd->query();
  /*
    log_on is about delayed inserts only.
    By default, both logs are enabled (this won't cause problems if the server
    runs without --log-bin).
  */
  bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
#endif
  Item *unused_conds= 0;
#ifdef WITH_PARTITION_STORAGE_ENGINE
  uint num_partitions= 0;
  enum partition_info::enum_can_prune can_prune_partitions=
                                                  partition_info::PRUNE_NO;
  MY_BITMAP used_partitions;
  bool prune_needs_default_values;
#endif /* WITH_PARITITION_STORAGE_ENGINE */
  DBUG_ENTER("mysql_insert");

  /*
    Upgrade lock type if the requested lock is incompatible with
    the current connection mode or table operation.
  */
  upgrade_lock_type(thd, &table_list->lock_type, duplic);

  /*
    We can't write-delayed into a table locked with LOCK TABLES:
    this will lead to a deadlock, since the delayed thread will
    never be able to get a lock on the table. QQQ: why not
    upgrade the lock here instead?
  */
  if (table_list->lock_type == TL_WRITE_DELAYED &&
      thd->locked_tables_mode &&
      find_locked_table(thd->open_tables, table_list->db,
                        table_list->table_name))
  {
    my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
             table_list->table_name);
    DBUG_RETURN(TRUE);
  }

  if (table_list->lock_type == TL_WRITE_DELAYED)
  {
    if (open_and_lock_for_insert_delayed(thd, table_list))
      DBUG_RETURN(TRUE);
    is_locked= true;
  }
  else
  {
    if (open_normal_and_derived_tables(thd, table_list, 0))
      DBUG_RETURN(true);
  }

  const thr_lock_type lock_type= table_list->lock_type;

  THD_STAGE_INFO(thd, stage_init);
  thd->lex->used_tables=0;

  List_iterator_fast<List_item> its(values_list);
  List_item *values= its++;
  const uint value_count= values->elements;
  TABLE *table= NULL;
  if (mysql_prepare_insert(thd, table_list, table, fields, values,
			   update_fields, update_values, duplic, &unused_conds,
                           FALSE,
                           (fields.elements || !value_count ||
                            table_list->view != 0),
                           !ignore && thd->is_strict_mode()))
    goto exit_without_my_ok;

  /* mysql_prepare_insert set table_list->table if it was not set */
  table= table_list->table;

  /* Must be done before can_prune_insert, due to internal initialization. */
  if (info.add_function_default_columns(table, table->write_set))
    goto exit_without_my_ok;
  if (duplic == DUP_UPDATE &&
      update.add_function_default_columns(table, table->write_set))
    goto exit_without_my_ok;

  context= &thd->lex->select_lex.context;
  /*
    These three asserts test the hypothesis that the resetting of the name
    resolution context below is not necessary at all since the list of local
    tables for INSERT always consists of one table.
  */
  DBUG_ASSERT(!table_list->next_local);
  DBUG_ASSERT(!context->table_list->next_local);
  DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);

  /* Save the state of the current name resolution context. */
  ctx_state.save_state(context, table_list);

  /*
    Perform name resolution only in the first table - 'table_list',
    which is the table that is inserted into.
  */
  table_list->next_local= 0;
  context->resolve_in_table_list_only(table_list);

#ifdef WITH_PARTITION_STORAGE_ENGINE
  if (!is_locked && table->part_info)
  {
    if (table->part_info->can_prune_insert(thd,
                                           duplic,
                                           update,
                                           update_fields,
                                           fields,
                                           !MY_TEST(values->elements),
                                           &can_prune_partitions,
                                           &prune_needs_default_values,
                                           &used_partitions))
      goto exit_without_my_ok;

    if (can_prune_partitions != partition_info::PRUNE_NO)
    {
      num_partitions= table->part_info->lock_partitions.n_bits;
      /*
        Pruning probably possible, all partitions is unmarked for read/lock,
        and we must now add them on row by row basis.

        Check the first INSERT value.
        Do not fail here, since that would break MyISAM behavior of inserting
        all rows before the failing row.

        PRUNE_DEFAULTS means the partitioning fields are only set to DEFAULT
        values, so we only need to check the first INSERT value, since all the
        rest will be in the same partition.
      */
      if (table->part_info->set_used_partition(fields,
                                               *values,
                                               info,
                                               prune_needs_default_values,
                                               &used_partitions))
        can_prune_partitions= partition_info::PRUNE_NO;
    }
  }
#endif /* WITH_PARTITION_STORAGE_ENGINE */

  while ((values= its++))
  {
    counter++;
    if (values->elements != value_count)
    {
      my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
      goto exit_without_my_ok;
    }
    if (setup_fields(thd, Ref_ptr_array(), *values, MARK_COLUMNS_READ, 0, 0))
      goto exit_without_my_ok;

#ifdef WITH_PARTITION_STORAGE_ENGINE
    /*
      To make it possible to increase concurrency on table level locking
      engines such as MyISAM, we check pruning for each row until we will use
      all partitions, Even if the number of rows is much higher than the
      number of partitions.
      TODO: Cache the calculated part_id and reuse in
      ha_partition::write_row() if possible.
    */
    if (can_prune_partitions == partition_info::PRUNE_YES)
    {
      if (table->part_info->set_used_partition(fields,
                                               *values,
                                               info,
                                               prune_needs_default_values,
                                               &used_partitions))
        can_prune_partitions= partition_info::PRUNE_NO;
      if (!(counter % num_partitions))
      {
        /*
          Check if we using all partitions in table after adding partition
          for current row to the set of used partitions. Do it only from
          time to time to avoid overhead from bitmap_is_set_all() call.
        */
        if (bitmap_is_set_all(&used_partitions))
          can_prune_partitions= partition_info::PRUNE_NO;
      }
    }
#endif /* WITH_PARTITION_STORAGE_ENGINE */
  }
  table->auto_increment_field_not_null= false;
  its.rewind ();
 
  /* Restore the current context. */
  ctx_state.restore_state(context, table_list);

  if (thd->lex->describe)
  {
    /*
      Send "No tables used" and stop execution here since
      there is no SELECT to explain.
    */

    err= explain_no_table(thd, "No tables used");
    goto exit_without_my_ok;
  }

#ifdef WITH_PARTITION_STORAGE_ENGINE
  if (can_prune_partitions != partition_info::PRUNE_NO)
  {
    /*
      Only lock the partitions we will insert into.
      And also only read from those partitions (duplicates etc.).
      If explicit partition selection 'INSERT INTO t PARTITION (p1)' is used,
      the new set of read/lock partitions is the intersection of read/lock
      partitions and used partitions, i.e only the partitions that exists in
      both sets will be marked for read/lock.
      It is also safe for REPLACE, since all potentially conflicting records
      always belong to the same partition as the one which we try to
      insert a row. This is because ALL unique/primary keys must
      include ALL partitioning columns.
    */
    bitmap_intersect(&table->part_info->read_partitions,
                     &used_partitions);
    bitmap_intersect(&table->part_info->lock_partitions,
                     &used_partitions);
  }
#endif /* WITH_PARTITION_STORAGE_ENGINE */

  /* Lock the tables now if not delayed/already locked. */
  if (!is_locked &&
      lock_tables(thd, table_list, thd->lex->table_count, 0))
    DBUG_RETURN(true);
 
  /*
    Count warnings for all inserts.
    For single line insert, generate an error if try to set a NOT NULL field
    to NULL.
  */
  thd->count_cuted_fields= ((values_list.elements == 1 &&
                             !ignore) ?
			    CHECK_FIELD_ERROR_FOR_NULL :
			    CHECK_FIELD_WARN);
  thd->cuted_fields = 0L;
  table->next_number_field=table->found_next_number_field;

#ifdef HAVE_REPLICATION
  if (thd->slave_thread)
  {
    DBUG_ASSERT(active_mi != NULL);
    if(info.get_duplicate_handling() == DUP_UPDATE &&
       table->next_number_field != NULL &&
       rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
      goto exit_without_my_ok;
  }
#endif

  error=0;
  THD_STAGE_INFO(thd, stage_update);
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
  if (duplic == DUP_UPDATE)
    table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
  /*
    let's *try* to start bulk inserts. It won't necessary
    start them as values_list.elements should be greater than
    some - handler dependent - threshold.
    We should not start bulk inserts if this statement uses
    functions or invokes triggers since they may access
    to the same table and therefore should not see its
    inconsistent state created by this optimization.
    So we call start_bulk_insert to perform nesessary checks on
    values_list.elements, and - if nothing else - to initialize
    the code to make the call of end_bulk_insert() below safe.
  */
#ifndef EMBEDDED_LIBRARY
  if (lock_type != TL_WRITE_DELAYED)
#endif /* EMBEDDED_LIBRARY */
  {
    if (duplic != DUP_ERROR || ignore)
      table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
    /**
      This is a simple check for the case when the table has a trigger
      that reads from it, or when the statement invokes a stored function
      that reads from the table being inserted to.
      Engines can't handle a bulk insert in parallel with a read form the
      same table in the same connection.
    */
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES)
      table->file->ha_start_bulk_insert(values_list.elements);
  }

  thd->abort_on_warning= (!ignore && thd->is_strict_mode());

  table->prepare_triggers_for_insert_stmt_or_event();
  table->mark_columns_needed_for_insert();

  if (table_list->prepare_where(thd, 0, TRUE) ||
      table_list->prepare_check_option(thd))
    error= 1;

  while ((values= its++))
  {
    if (fields.elements || !value_count)
    {
      restore_record(table,s->default_values);	// Get empty record

      /*
        Check whether default values of the fields not specified in column list
        are correct or not.
      */
      if (validate_default_values_of_unset_fields(thd, table))
      {
        error= 1;
        break;
      }

      if (fill_record_n_invoke_before_triggers(thd, fields, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
      {
	if (values_list.elements != 1 && ! thd->is_error())
	{
	  info.stats.records++;
	  continue;
	}
	/*
	  TODO: set thd->abort_on_warning if values_list.elements == 1
	  and check that all items return warning in case of problem with
	  storing field.
        */
	error=1;
	break;
      }
    }
    else
    {
      if (thd->lex->used_tables)               // Column used in values()
        restore_record(table,s->default_values); // Get empty record
      else
      {
        TABLE_SHARE *share= table->s;

        /*
          Fix delete marker. No need to restore rest of record since it will
          be overwritten by fill_record() anyway (and fill_record() does not
          use default values in this case).
        */
        table->record[0][0]= share->default_values[0];

        /* Fix undefined null_bits. */
        if (share->null_bytes > 1 && share->last_null_bit_pos)
        {
          table->record[0][share->null_bytes - 1]= 
            share->default_values[share->null_bytes - 1];
        }
      }
      if (fill_record_n_invoke_before_triggers(thd, table->field, *values, 0,
                                               table->triggers,
                                               TRG_EVENT_INSERT))
      {
	if (values_list.elements != 1 && ! thd->is_error())
	{
	  info.stats.records++;
	  continue;
	}
	error=1;
	break;
      }
    }

    if ((res= table_list->view_check_option(thd,
					    (values_list.elements == 1 ?
					     0 :
					     ignore))) ==
        VIEW_CHECK_SKIP)
      continue;
    else if (res == VIEW_CHECK_ERROR)
    {
      error= 1;
      break;
    }
#ifndef EMBEDDED_LIBRARY
    if (lock_type == TL_WRITE_DELAYED)
    {
      LEX_STRING const st_query = { query, thd->query_length() };
      DEBUG_SYNC(thd, "before_write_delayed");
      error= write_delayed(thd, table, st_query, log_on, &info);
      DEBUG_SYNC(thd, "after_write_delayed");
      query=0;
    }
    else
#endif
      error= write_record(thd, table, &info, &update);
    if (error)
      break;
    thd->get_stmt_da()->inc_current_row_for_warning();
  }

  free_underlaid_joins(thd, &thd->lex->select_lex);
  joins_freed= TRUE;

  /*
    Now all rows are inserted.  Time to update logs and sends response to
    user
  */
#ifndef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
  {
    if (!error)
    {
      info.stats.copied=values_list.elements;
      end_delayed_insert(thd);
    }
  }
  else
#endif
  {
    /*
      Do not do this release if this is a delayed insert, it would steal
      auto_inc values from the delayed_insert thread as they share TABLE.
    */
    table->file->ha_release_auto_increment();
    if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
        table->file->ha_end_bulk_insert() && !error)
    {
      table->file->print_error(my_errno,MYF(0));
      error=1;
    }
    if (duplic != DUP_ERROR || ignore)
      table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);

    transactional_table= table->file->has_transactions();

    if ((changed= (info.stats.copied || info.stats.deleted || info.stats.updated)))
    {
      /*
        Invalidate the table in the query cache if something changed.
        For the transactional algorithm to work the invalidation must be
        before binlog writing and ha_autocommit_or_rollback
      */
      query_cache_invalidate3(thd, table_list, 1);
    }

    if (error <= 0 ||
        thd->transaction.stmt.cannot_safely_rollback() ||
        was_insert_delayed)
    {
      if (mysql_bin_log.is_open())
      {
        int errcode= 0;
	if (error <= 0)
        {
	  /*
	    [Guilhem wrote] Temporary errors may have filled
	    thd->get_net()->last_error/errno.  For example if there has
	    been a disk full error when writing the row, and it was
	    MyISAM, then thd->get_net()-.last_error/errno will be set to
            "disk full"... and the mysql_file_pwrite() will wait until free
	    space appears, and so when it finishes then the
	    write_row() was entirely successful
	  */
	  /* todo: consider removing */
	  thd->clear_error();
	}
        else
          errcode= query_error_code(thd, thd->killed == THD::NOT_KILLED);
        
	/* bug#22725:

	A query which per-row-loop can not be interrupted with
	KILLED, like INSERT, and that does not invoke stored
	routines can be binlogged with neglecting the KILLED error.
        
	If there was no error (error == zero) until after the end of
	inserting loop the KILLED flag that appeared later can be
	disregarded since previously possible invocation of stored
	routines did not result in any error due to the KILLED.  In
	such case the flag is ignored for constructing binlog event.
	*/
	DBUG_ASSERT(thd->killed != THD::KILL_BAD_DATA || error > 0);
        if (was_insert_delayed && table_list->lock_type ==  TL_WRITE)
        {
          /* Binlog INSERT DELAYED as INSERT without DELAYED. */
          String log_query;
          if (create_insert_stmt_from_insert_delayed(thd, &log_query))
          {
            sql_print_error("Event Error: An error occurred while creating query string"
                            "for INSERT DELAYED stmt, before writing it into binary log.");

            error= 1;
          }
          else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
                                     log_query.c_ptr(), log_query.length(),
                                     transactional_table, FALSE, FALSE,
                                     errcode))
            error= 1;
        }
        else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
			           thd->query(), thd->query_length(),
			           transactional_table, FALSE, FALSE,
                                   errcode))
	  error= 1;
      }
    }
    DBUG_ASSERT(transactional_table || !changed || 
                thd->transaction.stmt.cannot_safely_rollback());
  }
  THD_STAGE_INFO(thd, stage_end);
  /*
    We'll report to the client this id:
    - if the table contains an autoincrement column and we successfully
    inserted an autogenerated value, the autogenerated value.
    - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
    called, X.
    - if the table contains an autoincrement column, and some rows were
    inserted, the id of the last "inserted" row (if IGNORE, that value may not
    have been really inserted but ignored).
  */
  id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
    thd->first_successful_insert_id_in_cur_stmt :
    (thd->arg_of_last_insert_id_function ?
     thd->first_successful_insert_id_in_prev_stmt :
     ((table->next_number_field && info.stats.copied) ?
     table->next_number_field->val_int() : 0));
  table->next_number_field=0;
  thd->count_cuted_fields= CHECK_FIELD_IGNORE;
  table->auto_increment_field_not_null= FALSE;
  if (duplic == DUP_REPLACE &&
      (!table->triggers || !table->triggers->has_delete_triggers()))
    table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);

  if (error)
    goto exit_without_my_ok;
  if (values_list.elements == 1 && (!(thd->variables.option_bits & OPTION_WARNINGS) ||
				    !thd->cuted_fields))
  {
    my_ok(thd, info.stats.copied + info.stats.deleted +
               ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                info.stats.touched : info.stats.updated),
          id);
  }
  else
  {
    char buff[160];
    ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
                     info.stats.touched : info.stats.updated);
    if (ignore)
      my_snprintf(buff, sizeof(buff),
                  ER(ER_INSERT_INFO), (long) info.stats.records,
                  (lock_type == TL_WRITE_DELAYED) ? (long) 0 :
                  (long) (info.stats.records - info.stats.copied),
                  (long) thd->get_stmt_da()->current_statement_warn_count());
    else
      my_snprintf(buff, sizeof(buff),
                  ER(ER_INSERT_INFO), (long) info.stats.records,
                  (long) (info.stats.deleted + updated),
                  (long) thd->get_stmt_da()->current_statement_warn_count());
    my_ok(thd, info.stats.copied + info.stats.deleted + updated, id, buff);
  }
  thd->abort_on_warning= 0;
  DBUG_RETURN(FALSE);

exit_without_my_ok:
#ifndef EMBEDDED_LIBRARY
  if (lock_type == TL_WRITE_DELAYED)
    end_delayed_insert(thd);
#endif
  if (!joins_freed)
    free_underlaid_joins(thd, &thd->lex->select_lex);
  thd->abort_on_warning= 0;
  DBUG_RETURN(err);
}