bool mysql_delete()

in sql/sql_delete.cc [48:510]


bool mysql_delete(THD *thd, TABLE_LIST *table_list, Item *conds,
                  SQL_I_List<ORDER> *order_list, ha_rows limit, ulonglong options)
{
  bool          will_batch;
  int		error, loc_error;
  TABLE		*table;
  SQL_SELECT	*select=0;
  READ_RECORD	info;
  bool          using_limit=limit != HA_POS_ERROR;
  bool		transactional_table, safe_update, const_cond;
  bool          const_cond_result;
  ha_rows	deleted= 0;
  bool          reverse= FALSE;
  bool          read_removal= false;
  bool          skip_record;
  bool          need_sort= FALSE;
  bool          err= true;
  ORDER *order= (ORDER *) ((order_list && order_list->elements) ?
                           order_list->first : NULL);
  uint usable_index= MAX_KEY;
  SELECT_LEX   *select_lex= &thd->lex->select_lex;
  THD::killed_state killed_status= THD::NOT_KILLED;
  THD::enum_binlog_query_type query_type= THD::ROW_QUERY_TYPE;
  DBUG_ENTER("mysql_delete");

  if (open_normal_and_derived_tables(thd, table_list, 0))
    DBUG_RETURN(TRUE);

  if (!(table= table_list->table))
  {
    my_error(ER_VIEW_DELETE_MERGE_VIEW, MYF(0),
	     table_list->view_db.str, table_list->view_name.str);
    DBUG_RETURN(TRUE);
  }
  THD_STAGE_INFO(thd, stage_init);
  table->map=1;

  if (mysql_prepare_delete(thd, table_list, &conds))
    DBUG_RETURN(TRUE);

  /* check ORDER BY even if it can be ignored */
  if (order)
  {
    TABLE_LIST   tables;
    List<Item>   fields;
    List<Item>   all_fields;

    memset(&tables, 0, sizeof(tables));
    tables.table = table;
    tables.alias = table_list->alias;

      if (select_lex->setup_ref_array(thd, order_list->elements) ||
	  setup_order(thd, select_lex->ref_pointer_array, &tables,
                    fields, all_fields, order))
    {
      free_underlaid_joins(thd, &thd->lex->select_lex);
      DBUG_RETURN(TRUE);
    }
  }

  // Parse column usage statistics and store it into THD.
  parse_column_usage_info(thd);

#ifdef WITH_PARTITION_STORAGE_ENGINE
  /*
    Non delete tables are pruned in JOIN::prepare,
    only the delete table needs this.
  */
  if (prune_partitions(thd, table, conds))
    DBUG_RETURN(true);
  if (table->all_partitions_pruned_away)
    goto exit_all_parts_pruned_away;
#endif

  if (lock_tables(thd, table_list, thd->lex->table_count, 0))
    DBUG_RETURN(true);

  const_cond= (!conds || conds->const_item());
  safe_update= MY_TEST(thd->variables.option_bits & OPTION_SAFE_UPDATES);
  if (safe_update && const_cond)
  {
    my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
               ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
    DBUG_RETURN(TRUE);
  }

  select_lex->no_error= thd->lex->ignore;

  const_cond_result= const_cond && (!conds || conds->val_int());
  if (thd->is_error())
  {
    /* Error evaluating val_int(). */
    DBUG_RETURN(TRUE);
  }

  /*
    Test if the user wants to delete all rows and deletion doesn't have
    any side-effects (because of triggers), so we can use optimized
    handler::delete_all_rows() method.

    We can use delete_all_rows() if and only if:
    - We allow new functions (not using option --skip-new)
    - There is no limit clause
    - The condition is constant
    - If there is a condition, then it it produces a non-zero value
    - If the current command is DELETE FROM with no where clause, then:
      - We should not be binlogging this statement in row-based, and
      - there should be no delete triggers associated with the table.
  */
  if (!using_limit && const_cond_result &&
      !(specialflag & SPECIAL_NO_NEW_FUNC) &&
       (!thd->is_current_stmt_binlog_format_row() &&
        !(table->triggers && table->triggers->has_delete_triggers())))
  {
    /* Update the table->file->stats.records number */
    table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
    ha_rows const maybe_deleted= table->file->stats.records;

    if (thd->lex->describe)
    {
      err= explain_no_table(thd, "Deleting all rows", maybe_deleted);
      goto exit_without_my_ok;
    }

    DBUG_PRINT("debug", ("Trying to use delete_all_rows()"));
    if (!(error=table->file->ha_delete_all_rows()))
    {
      /*
        If delete_all_rows() is used, it is not possible to log the
        query in row format, so we have to log it in statement format.
      */
      query_type= THD::STMT_QUERY_TYPE;
      error= -1;
      deleted= maybe_deleted;
      goto cleanup;
    }
    if (error != HA_ERR_WRONG_COMMAND)
    {
      table->file->print_error(error,MYF(0));
      error=0;
      goto cleanup;
    }
    /* Handler didn't support fast delete; Delete rows one by one */
  }

  if (conds)
  {
    COND_EQUAL *cond_equal= NULL;
    Item::cond_result result;

    conds= optimize_cond(thd, conds, &cond_equal, select_lex->join_list,
                         true, &result);
    if (result == Item::COND_FALSE)             // Impossible where
    {
      limit= 0;

      if (thd->lex->describe)
      {
        err= explain_no_table(thd, "Impossible WHERE");
        goto exit_without_my_ok;
      }
    }
    if (conds)
    {
      conds= substitute_for_best_equal_field(conds, cond_equal, 0);
      if (conds == nullptr)
      {
        err = ER_OUTOFMEMORY;
        goto exit_without_my_ok;
      }
      conds->update_used_tables();
    }
  }

  /* Update the table->file->stats.records number */
  table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);

  table->covering_keys.clear_all();
  table->quick_keys.clear_all();		// Can't use 'only index'
  table->possible_quick_keys.clear_all();

#ifdef WITH_PARTITION_STORAGE_ENGINE
  /* Prune a second time to be able to prune on subqueries in WHERE clause. */
  if (prune_partitions(thd, table, conds))
    DBUG_RETURN(true);
  if (table->all_partitions_pruned_away)
    goto exit_all_parts_pruned_away;
#endif

  select=make_select(table, 0, 0, conds, 0, &error);
  if (error)
    DBUG_RETURN(TRUE);

  { // Enter scope for optimizer trace wrapper
    Opt_trace_object wrapper(&thd->opt_trace);
    wrapper.add_utf8_table(table);

    if ((select && select->check_quick(thd, safe_update, limit)) || !limit)
    {
      if (thd->lex->describe && !error && !thd->is_error())
      {
        err= explain_no_table(thd, "Impossible WHERE");
        goto exit_without_my_ok;
      }
      delete select;
      free_underlaid_joins(thd, select_lex);
      /*
         Error was already created by quick select evaluation (check_quick()).
         TODO: Add error code output parameter to Item::val_xxx() methods.
         Currently they rely on the user checking DA for
         errors when unwinding the stack after calling Item::val_xxx().
      */
      if (thd->is_error())
        DBUG_RETURN(true);
      my_ok(thd, 0);
      DBUG_RETURN(false);                       // Nothing to delete
    }
  } // Ends scope for optimizer trace wrapper

  /* If running in safe sql mode, don't allow updates without keys */
  if (table->quick_keys.is_clear_all())
  {
    thd->server_status|=SERVER_QUERY_NO_INDEX_USED;
    if (safe_update && !using_limit)
    {
      delete select;
      free_underlaid_joins(thd, select_lex);
      my_message(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE,
                 ER(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE), MYF(0));
      DBUG_RETURN(TRUE);
    }
  }

  if (order)
  {
    table->update_const_key_parts(conds);
    order= simple_remove_const(order, conds);

    usable_index= get_index_for_order(order, table, select, limit,
                                      &need_sort, &reverse);
  }

  if (thd->lex->describe)
  {
    err= explain_single_table_modification(thd, table, select, usable_index,
                                           limit, false, need_sort, false);
    goto exit_without_my_ok;
  }

  if (options & OPTION_QUICK)
    (void) table->file->extra(HA_EXTRA_QUICK);

  if (need_sort)
  {
    ha_rows examined_rows;
    ha_rows found_rows;

    {
      Filesort fsort(order, HA_POS_ERROR, select);
      DBUG_ASSERT(usable_index == MAX_KEY);
      table->sort.io_cache= (IO_CACHE *) my_malloc(sizeof(IO_CACHE),
                                                   MYF(MY_FAE | MY_ZEROFILL));

      if ((table->sort.found_records= filesort(thd, table, &fsort, true,
                                               &examined_rows, &found_rows))
	  == HA_POS_ERROR)
      {
        goto exit_without_my_ok;
      }
      thd->inc_examined_row_count(examined_rows);
      /*
        Filesort has already found and selected the rows we want to delete,
        so we don't need the where clause
      */
      delete select;
      free_underlaid_joins(thd, select_lex);
      select= 0;
    }
  }

  /* If quick select is used, initialize it before retrieving rows. */
  if (select && select->quick && (error= select->quick->reset()))
  {
    table->file->print_error(error, MYF(0));
    goto exit_without_my_ok;
  }

  if (select_lex->has_ft_funcs() && init_ftfuncs(thd, select_lex, 1))
    goto exit_without_my_ok;

  if (usable_index==MAX_KEY || (select && select->quick))
    error= init_read_record(&info, thd, table, select, 1, 1, FALSE);
  else
    error= init_read_record_idx(&info, thd, table, 1, usable_index, reverse);

  if (error)
    goto exit_without_my_ok;

  THD_STAGE_INFO(thd, stage_updating);

  if (table->prepare_triggers_for_delete_stmt_or_event())
  {
    will_batch= FALSE;
  }
  else
    will_batch= !table->file->start_bulk_delete();

  table->mark_columns_needed_for_delete();

  if ((table->file->ha_table_flags() & HA_READ_BEFORE_WRITE_REMOVAL) &&
      !using_limit &&
      !(table->triggers && table->triggers->has_delete_triggers()) &&
      select && select->quick && select->quick->index != MAX_KEY)
    read_removal= table->check_read_removal(select->quick->index);

  while (!(error=info.read_record(&info)) && !thd->killed &&
	 ! thd->is_error())
  {
    thd->inc_examined_row_count(1);
    // thd->is_error() is tested to disallow delete row on error
    if (!select || (!select->skip_record(thd, &skip_record) && !skip_record))
    {

      if (table->triggers &&
          table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                            TRG_ACTION_BEFORE, FALSE))
      {
        error= 1;
        break;
      }

      if (!(error= table->file->ha_delete_row(table->record[0])))
      {
	deleted++;
        if (table->triggers &&
            table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                              TRG_ACTION_AFTER, FALSE))
        {
          error= 1;
          break;
        }
	if (!--limit && using_limit)
	{
	  error= -1;
	  break;
	}
      }
      else
      {
	table->file->print_error(error,MYF(0));
	/*
	  In < 4.0.14 we set the error number to 0 here, but that
	  was not sensible, because then MySQL would not roll back the
	  failed DELETE, and also wrote it to the binlog. For MyISAM
	  tables a DELETE probably never should fail (?), but for
	  InnoDB it can fail in a FOREIGN KEY error or an
	  out-of-tablespace error.
	*/
 	error= 1;
	break;
      }
    }
    /*
      Don't try unlocking the row if skip_record reported an error since in
      this case the transaction might have been rolled back already.
    */
    else if (!thd->is_error())
      table->file->unlock_row();  // Row failed selection, release lock on it
    else
      break;
  }
  killed_status= thd->killed;
  if (killed_status != THD::NOT_KILLED || thd->is_error())
    error= 1;					// Aborted
  if (will_batch && (loc_error= table->file->end_bulk_delete()))
  {
    if (error != 1)
      table->file->print_error(loc_error,MYF(0));
    error=1;
  }
  if (read_removal)
  {
    /* Only handler knows how many records were really written */
    deleted= table->file->end_read_removal();
  }
  THD_STAGE_INFO(thd, stage_end);
  end_read_record(&info);
  if (options & OPTION_QUICK)
    (void) table->file->extra(HA_EXTRA_NORMAL);

cleanup:
  DBUG_ASSERT(!thd->lex->describe);
  /*
    Invalidate the table in the query cache if something changed. This must
    be before binlog writing and ha_autocommit_...
  */
  if (deleted)
  {
    query_cache_invalidate3(thd, table_list, 1);
  }

  delete select;
  select= NULL;
  transactional_table= table->file->has_transactions();

  if (!transactional_table && deleted > 0)
    thd->transaction.stmt.mark_modified_non_trans_table();

  /* See similar binlogging code in sql_update.cc, for comments */
  if ((error < 0) || thd->transaction.stmt.cannot_safely_rollback())
  {
    if (mysql_bin_log.is_open())
    {
      int errcode= 0;
      if (error < 0)
        thd->clear_error();
      else
        errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);

      /*
        [binlog]: If 'handler::delete_all_rows()' was called and the
        storage engine does not inject the rows itself, we replicate
        statement-based; otherwise, 'ha_delete_row()' was used to
        delete specific rows which we might log row-based.
      */
      int log_result= thd->binlog_query(query_type,
                                        thd->query(), thd->query_length(),
                                        transactional_table, FALSE, FALSE,
                                        errcode);

      if (log_result)
      {
	error=1;
      }
    }
  }
  DBUG_ASSERT(transactional_table || !deleted || thd->transaction.stmt.cannot_safely_rollback());
  free_underlaid_joins(thd, select_lex);
  if (error < 0 ||
      (thd->lex->ignore && !thd->is_error() && !thd->is_fatal_error))
  {
    my_ok(thd, deleted);
    DBUG_PRINT("info",("%ld records deleted",(long) deleted));
  }
  DBUG_RETURN(thd->is_error() || thd->killed);

#ifdef WITH_PARTITION_STORAGE_ENGINE
exit_all_parts_pruned_away:
  /* No matching records */
  if (!thd->lex->describe)
  {
    my_ok(thd, 0);
    DBUG_RETURN(0);
  }
  err= explain_no_table(thd, "No matching rows after partition pruning");
#endif

exit_without_my_ok:
  delete select;
  free_underlaid_joins(thd, select_lex);
  table->set_keyread(false);
  DBUG_RETURN((err || thd->is_error() || thd->killed) ? 1 : 0);
}