bool Sql_cmd_delete::delete_from_single_table()

in sql/sql_delete.cc [127:653]


bool Sql_cmd_delete::delete_from_single_table(THD *thd)
{
  DBUG_ENTER("Sql_cmd_delete::delete_from_single_table");

  myf           error_flags= MYF(0);            /**< Flag for fatal errors */
  bool          will_batch;
  /*
    Most recent handler error
    =  1: Some non-handler error
    =  0: Success
    = -1: No more rows to process, or reached limit
  */
  int           error= 0;
  READ_RECORD   info;
  ha_rows       deleted_rows= 0;
  bool          reverse= false;
  /// read_removal is only used by NDB storage engine
  bool          read_removal= false;
  bool          need_sort= false;

  uint usable_index= MAX_KEY;
  SELECT_LEX *const select_lex= lex->select_lex;
  SELECT_LEX_UNIT *const unit= select_lex->master_unit();
  ORDER *order= select_lex->order_list.first;
  TABLE_LIST *const table_list= select_lex->get_table_list();
  THD::killed_state killed_status= THD::NOT_KILLED;
  THD::enum_binlog_query_type query_type= THD::ROW_QUERY_TYPE;

  const bool safe_update= thd->variables.option_bits & OPTION_SAFE_UPDATES;

  TABLE_LIST *const delete_table_ref= table_list->updatable_base_table();
  TABLE *const table= delete_table_ref->table;

  const bool transactional_table= table->file->has_transactions();

  const bool has_delete_triggers=
    table->triggers && table->triggers->has_delete_triggers();

  const bool has_before_triggers=
    has_delete_triggers && table->triggers->has_triggers(TRG_EVENT_DELETE,
                                                         TRG_ACTION_BEFORE);
  const bool has_after_triggers=
    has_delete_triggers && table->triggers->has_triggers(TRG_EVENT_DELETE,
                                                         TRG_ACTION_AFTER);
  unit->set_limit(thd, select_lex);

  ha_rows limit= unit->select_limit_cnt;
  const bool using_limit= limit != HA_POS_ERROR;

  // Used to track whether there are no rows that need to be read
  bool no_rows= limit == 0;

  Item *conds;
  if (select_lex->get_optimizable_conditions(thd, &conds, NULL))
    DBUG_RETURN(true);                 /* purecov: inspected */

  /*
    See if we can substitute expressions with equivalent generated
    columns in the WHERE and ORDER BY clauses of the DELETE statement.
    It is unclear if this is best to do before or after the other
    substitutions performed by substitute_for_best_equal_field(). Do
    it here for now, to keep it consistent with how multi-table
    deletes are optimized in JOIN::optimize().
  */
  if (conds || order)
    static_cast<void>(substitute_gc(thd, select_lex, conds, NULL, order));

  QEP_TAB_standalone qep_tab_st;
  QEP_TAB &qep_tab= qep_tab_st.as_QEP_TAB();

  if (table->all_partitions_pruned_away)
  {
    /*
      All partitions were pruned away during preparation. Shortcut further
      processing by "no rows". If explaining, report the plan and bail out.
    */
    no_rows= true;

    if (lex->describe)
    {
      Modification_plan plan(thd, MT_DELETE, table,
                             "No matching rows after partition pruning",
                             true, 0);
      bool err= explain_single_table_modification(thd, &plan, select_lex);
      DBUG_RETURN(err);
    }
  }

  const bool const_cond= (!conds || conds->const_item());
  if (safe_update && const_cond)
  {
    // Safe mode is a runtime check, so apply it in execution and not prepare
    my_error(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE, MYF(0));
    DBUG_RETURN(true);
  }

  const bool const_cond_result= const_cond && (!conds || conds->val_int());
  if (thd->is_error())   // Error during val_int()
    DBUG_RETURN(true);                 /* purecov: inspected */
  /*
    We are passing HA_EXTRA_IGNORE_DUP_KEY flag here to recreate query with
    IGNORE keyword within federated storage engine. If federated engine is
    removed in the future, use of HA_EXTRA_IGNORE_DUP_KEY and
    HA_EXTRA_NO_IGNORE_DUP_KEY flag should be removed from
    delete_from_single_table(), Query_result_delete::optimize() and
  */
  if (lex->is_ignore())
    table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);

  /*
    Test if the user wants to delete all rows and deletion doesn't have
    any side-effects (because of triggers), so we can use optimized
    handler::delete_all_rows() method.

    We can use delete_all_rows() if and only if:
    - We allow new functions (not using option --skip-new)
    - There is no limit clause
    - The condition is constant
    - If there is a condition, then it it produces a non-zero value
    - If the current command is DELETE FROM with no where clause, then:
      - We will not be binlogging this statement in row-based, and
      - there should be no delete triggers associated with the table.
  */
  if (!using_limit && const_cond_result &&
      !(specialflag & SPECIAL_NO_NEW_FUNC) &&
      ((!thd->is_current_stmt_binlog_format_row() ||  // not ROW binlog-format
        thd->is_current_stmt_binlog_disabled()) && // no binlog for this command
       !has_delete_triggers))
  {
    /* Update the table->file->stats.records number */
    table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);
    ha_rows const maybe_deleted= table->file->stats.records;

    Modification_plan plan(thd, MT_DELETE, table,
                           "Deleting all rows", false, maybe_deleted);
    if (lex->describe)
    {
      bool err= explain_single_table_modification(thd, &plan, select_lex);
      DBUG_RETURN(err);
    }

    DBUG_PRINT("debug", ("Trying to use delete_all_rows()"));
    if (!(error=table->file->ha_delete_all_rows()))
    {
      /*
        As delete_all_rows() was used, we have to log it in statement format.
      */
      query_type= THD::STMT_QUERY_TYPE;
      error= -1;
      deleted_rows= maybe_deleted;
      goto cleanup;
    }
    if (error != HA_ERR_WRONG_COMMAND)
    {
      if (table->file->is_fatal_error(error))
        error_flags|= ME_FATALERROR;

      table->file->print_error(error, error_flags);
      goto cleanup;
    }
    /* Handler didn't support fast delete; Delete rows one by one */
  }

  if (conds)
  {
    COND_EQUAL *cond_equal= NULL;
    Item::cond_result result;

    if (optimize_cond(thd, &conds, &cond_equal, select_lex->join_list,
                      &result))
      DBUG_RETURN(true);
    if (result == Item::COND_FALSE)             // Impossible where
    {
      no_rows= true;

      if (lex->describe)
      {
        Modification_plan plan(thd, MT_DELETE, table,
                               "Impossible WHERE", true, 0);
        bool err= explain_single_table_modification(thd, &plan, select_lex);
        DBUG_RETURN(err);
      }
    }
    if (conds)
    {
      conds= substitute_for_best_equal_field(conds, cond_equal, 0);
      if (conds == NULL)
        DBUG_RETURN(true);

      conds->update_used_tables();
    }
  }

  // Initialize the cost model that will be used for this table
  table->init_cost_model(thd->cost_model());

  /* Update the table->file->stats.records number */
  table->file->info(HA_STATUS_VARIABLE | HA_STATUS_NO_LOCK);

  // These have been cleared when binding the TABLE object.
  DBUG_ASSERT(table->quick_keys.is_clear_all() &&
              table->possible_quick_keys.is_clear_all());

  table->covering_keys.clear_all();

  /* Prune a second time to be able to prune on subqueries in WHERE clause. */
  if (prune_partitions(thd, table, conds))
    DBUG_RETURN(true);
  if (table->all_partitions_pruned_away)
  {
    /* No matching records */
    if (lex->describe)
    {
      Modification_plan plan(thd, MT_DELETE, table,
                             "No matching rows after partition pruning",
                             true, 0);
      bool err= explain_single_table_modification(thd, &plan, select_lex);
      DBUG_RETURN(err);
    }
    my_ok(thd, 0);
    DBUG_RETURN(false);
  }

  qep_tab.set_table(table);
  qep_tab.set_condition(conds);

  { // Enter scope for optimizer trace wrapper
    Opt_trace_object wrapper(&thd->opt_trace);
    wrapper.add_utf8_table(delete_table_ref);

    if (!no_rows && conds != NULL)
    {
      Key_map keys_to_use(Key_map::ALL_BITS), needed_reg_dummy;
      QUICK_SELECT_I *qck;
      no_rows= test_quick_select(thd, keys_to_use, 0, limit, safe_update,
                                 ORDER_NOT_RELEVANT, &qep_tab,
                                 conds, &needed_reg_dummy, &qck) < 0;
      qep_tab.set_quick(qck);
    }
    if (thd->is_error()) // test_quick_select() has improper error propagation
      DBUG_RETURN(true);

    if (no_rows)
    {
      if (lex->describe)
      {
        Modification_plan plan(thd, MT_DELETE, table,
                               "Impossible WHERE", true, 0);
        bool err= explain_single_table_modification(thd, &plan, select_lex);
        DBUG_RETURN(err);
      }

      my_ok(thd, 0);
      DBUG_RETURN(false);                       // Nothing to delete
    }
  } // Ends scope for optimizer trace wrapper

  /* If running in safe sql mode, don't allow updates without keys */
  if (table->quick_keys.is_clear_all())
  {
    thd->server_status|=SERVER_QUERY_NO_INDEX_USED;
    if (safe_update && !using_limit)
    {
      my_error(ER_UPDATE_WITHOUT_KEY_IN_SAFE_MODE, MYF(0));
      DBUG_RETURN(true);
    }
  }

  if (order)
  {
    if (table->update_const_key_parts(conds))
      DBUG_RETURN(true);
    order= simple_remove_const(order, conds);
    ORDER_with_src order_src(order, ESC_ORDER_BY);
    usable_index= get_index_for_order(&order_src, &qep_tab, limit,
                                      &need_sort, &reverse);
  }

  // Reaching here only when table must be accessed
  DBUG_ASSERT(!no_rows);

  {
    ha_rows rows;
    if (qep_tab.quick())
      rows= qep_tab.quick()->records;
    else if (!conds && !need_sort && limit != HA_POS_ERROR)
      rows= limit;
    else
    {
      delete_table_ref->fetch_number_of_rows();
      rows= table->file->stats.records;
    }
    qep_tab.set_quick_optim();
    qep_tab.set_condition_optim();
    Modification_plan plan(thd, MT_DELETE, &qep_tab,
                           usable_index, limit, false, need_sort,
                           false, rows);
    DEBUG_SYNC(thd, "planned_single_delete");

    if (lex->describe)
    {
      bool err= explain_single_table_modification(thd, &plan, select_lex);
      DBUG_RETURN(err);
    }

    if (select_lex->active_options() & OPTION_QUICK)
      (void) table->file->extra(HA_EXTRA_QUICK);

    if (need_sort)
    {
      ha_rows examined_rows, found_rows, returned_rows;

      Filesort fsort(&qep_tab, order, HA_POS_ERROR);
      DBUG_ASSERT(usable_index == MAX_KEY);
      table->sort.io_cache=
        (IO_CACHE *) my_malloc(key_memory_TABLE_sort_io_cache,
                               sizeof(IO_CACHE), MYF(MY_FAE | MY_ZEROFILL));

      if (filesort(thd, &fsort, true,
                   &examined_rows, &found_rows, &returned_rows))
        DBUG_RETURN(true);

      table->sort.found_records= returned_rows;
      thd->inc_examined_row_count(examined_rows);
      /*
        Filesort has already found and selected the rows we want to delete,
        so we don't need the where clause
      */
      qep_tab.set_quick(NULL);
      qep_tab.set_condition(NULL);
      table->file->ha_index_or_rnd_end();
    }

    /* If quick select is used, initialize it before retrieving rows. */
    if (qep_tab.quick() && (error= qep_tab.quick()->reset()))
    {
      if (table->file->is_fatal_error(error))
        error_flags|= ME_FATALERROR;

      table->file->print_error(error, error_flags);
      DBUG_RETURN(true);
    }

    if (usable_index==MAX_KEY || qep_tab.quick())
      error= init_read_record(&info, thd, NULL, &qep_tab, 1, 1, FALSE);
    else
      error= init_read_record_idx(&info, thd, table, 1, usable_index, reverse);

    if (error)
      DBUG_RETURN(true);                 /* purecov: inspected */

    if (select_lex->has_ft_funcs() && init_ftfuncs(thd, select_lex))
      DBUG_RETURN(true);                 /* purecov: inspected */

    THD_STAGE_INFO(thd, stage_updating);

    if (has_after_triggers)
    {
      /*
        The table has AFTER DELETE triggers that might access to subject table
        and therefore might need delete to be done immediately. So we turn-off
        the batching.
      */
      (void) table->file->extra(HA_EXTRA_DELETE_CANNOT_BATCH);
      will_batch= false;
    }
    else
    {
      // No after delete triggers, attempt to start bulk delete
      will_batch= !table->file->start_bulk_delete();
    }
    table->mark_columns_needed_for_delete(thd);
    if (thd->is_error())
      DBUG_RETURN(true);

    if ((table->file->ha_table_flags() & HA_READ_BEFORE_WRITE_REMOVAL) &&
        !using_limit &&
        !has_delete_triggers &&
        qep_tab.quick() && qep_tab.quick()->index != MAX_KEY)
      read_removal= table->check_read_removal(qep_tab.quick()->index);

    DBUG_ASSERT(limit > 0);

    // The loop that reads rows and delete those that qualify

    while (!(error=info.read_record(&info)) && !thd->killed)
    {
      DBUG_ASSERT(!thd->is_error());
      thd->inc_examined_row_count(1);

      bool skip_record;
      if (qep_tab.skip_record(thd, &skip_record))
      {
        error= 1;
        break;
      }
      if (skip_record)
      {
        table->file->unlock_row();  // Row failed condition check, release lock
        continue;
      }

      DBUG_ASSERT(!thd->is_error());
      if (has_before_triggers &&
          table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                            TRG_ACTION_BEFORE, FALSE))
      {
        error= 1;
        break;
      }

      if ((error= table->file->ha_delete_row(table->record[0])))
      {
        if (table->file->is_fatal_error(error))
          error_flags|= ME_FATALERROR;

        table->file->print_error(error, error_flags);
        /*
          In < 4.0.14 we set the error number to 0 here, but that
          was not sensible, because then MySQL would not roll back the
          failed DELETE, and also wrote it to the binlog. For MyISAM
          tables a DELETE probably never should fail (?), but for
          InnoDB it can fail in a FOREIGN KEY error or an
          out-of-tablespace error.
        */
        if (thd->is_error()) // Could be downgraded to warning by IGNORE
        {
          error= 1;
          break;
        }
      }

      deleted_rows++;
      if (has_after_triggers &&
          table->triggers->process_triggers(thd, TRG_EVENT_DELETE,
                                            TRG_ACTION_AFTER, FALSE))
      {
        error= 1;
        break;
      }
      if (!--limit && using_limit)
      {
        error= -1;
        break;
      }
    }

    killed_status= thd->killed;
    if (killed_status != THD::NOT_KILLED || thd->is_error())
      error= 1;					// Aborted
    int loc_error;
    if (will_batch && (loc_error= table->file->end_bulk_delete()))
    {
      /* purecov: begin inspected */
      if (error != 1)
      {
        if (table->file->is_fatal_error(loc_error))
          error_flags|= ME_FATALERROR;

        table->file->print_error(loc_error, error_flags);
      }
      error=1;
      /* purecov: end */
    }
    if (read_removal)
    {
      /* Only handler knows how many records were really written */
      deleted_rows= table->file->end_read_removal();
    }
    end_read_record(&info);
    if (select_lex->active_options() & OPTION_QUICK)
      (void) table->file->extra(HA_EXTRA_NORMAL);
  } // End of scope for Modification_plan

cleanup:
  DBUG_ASSERT(!lex->describe);
  /*
    Invalidate the table in the query cache if something changed. This must
    be before binlog writing and ha_autocommit_...
  */
  if (deleted_rows > 0)
    query_cache.invalidate_single(thd, delete_table_ref, true);

  if (!transactional_table && deleted_rows > 0)
    thd->get_transaction()->mark_modified_non_trans_table(
      Transaction_ctx::STMT);
  
  /* See similar binlogging code in sql_update.cc, for comments */
  if ((error < 0) || thd->get_transaction()->cannot_safely_rollback(
      Transaction_ctx::STMT))
  {
    if (mysql_bin_log.is_open())
    {
      int errcode= 0;
      if (error < 0)
        thd->clear_error();
      else
        errcode= query_error_code(thd, killed_status == THD::NOT_KILLED);

      /*
        [binlog]: As we don't allow the use of 'handler:delete_all_rows()' when
        binlog_format == ROW, if 'handler::delete_all_rows()' was called
        we replicate statement-based; otherwise, 'ha_delete_row()' was used to
        delete specific rows which we might log row-based.
      */
      int log_result= thd->binlog_query(query_type,
                                        thd->query().str, thd->query().length,
                                        transactional_table, FALSE, FALSE,
                                        errcode);

      if (log_result)
      {
	error=1;
      }
    }
  }
  DBUG_ASSERT(transactional_table ||
              deleted_rows == 0 ||
              thd->get_transaction()->cannot_safely_rollback(
                  Transaction_ctx::STMT));
  if (error < 0)
  {
    my_ok(thd, deleted_rows);
    DBUG_PRINT("info",("%ld records deleted",(long) deleted_rows));
  }
  DBUG_RETURN(error > 0);
}