bool store_create_info()

in sql/sql_show.cc [1978:2646]


bool store_create_info(THD *thd, Table_ref *table_list, String *packet,
                       HA_CREATE_INFO *create_info_arg, bool show_database,
                       bool for_show_create_stmt) {
  char tmp[MAX_FIELD_WIDTH], buff[128], def_value_buf[MAX_FIELD_WIDTH];
  const char *alias;
  String type(tmp, sizeof(tmp), system_charset_info);
  String def_value(def_value_buf, sizeof(def_value_buf), system_charset_info);
  Field **ptr, *field;
  uint primary_key;
  KEY *key_info;
  TABLE *table = table_list->table;
  handler *file = table->file;
  TABLE_SHARE *share = table->s;
  HA_CREATE_INFO create_info;
  bool show_table_options = false;
  const bool foreign_db_mode = (thd->variables.sql_mode & MODE_ANSI) != 0;
  my_bitmap_map *old_map;
  bool error = false;
  DBUG_TRACE;
  DBUG_PRINT("enter", ("table: %s", table->s->table_name.str));

  restore_record(table, s->default_values);  // Get empty record

  if (share->tmp_table)
    packet->append(STRING_WITH_LEN("CREATE TEMPORARY TABLE "));
  else
    packet->append(STRING_WITH_LEN("CREATE TABLE "));
  if (create_info_arg &&
      (create_info_arg->options & HA_LEX_CREATE_IF_NOT_EXISTS))
    packet->append(STRING_WITH_LEN("IF NOT EXISTS "));
  if (table_list->schema_table)
    alias = table_list->schema_table->table_name;
  else {
    if (lower_case_table_names == 2)
      alias = table->alias;
    else {
      alias = share->table_name.str;
    }
  }

  /*
    Print the database before the table name if told to do that. The
    database name is only printed in the event that it is different
    from the current database.  The main reason for doing this is to
    avoid having to update gazillions of tests and result files, but
    it also saves a few bytes of the binary log.
   */
  const LEX_CSTRING *const db =
      table_list->schema_table ? &INFORMATION_SCHEMA_NAME : &table->s->db;
  if (show_database) {
    if (!thd->db().str || strcmp(db->str, thd->db().str)) {
      append_identifier(thd, packet, db->str, db->length);
      packet->append(STRING_WITH_LEN("."));
    }
  }

  append_identifier(thd, packet, alias, strlen(alias));
  packet->append(STRING_WITH_LEN(" (\n"));
  /*
    We need this to get default values from the table
    We have to restore the read_set if we are called from insert in case
    of row based replication.
  */
  old_map = tmp_use_all_columns(table, table->read_set);
  auto grd = create_scope_guard(
      [&]() { tmp_restore_column_map(table->read_set, old_map); });
  const dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
  const dd::Table *table_obj = nullptr;
  if (share->tmp_table)
    table_obj = table->s->tmp_table_def;
  else {
    if (thd->dd_client()->acquire(dd::String_type(share->db.str),
                                  dd::String_type(share->table_name.str),
                                  &table_obj))
      return true;
    DBUG_EXECUTE_IF("sim_acq_fail_in_store_ci", {
      my_error(ER_DA_UNKNOWN_ERROR_NUMBER, MYF(0), 42);
      return true;
    });
  }

  /*
    When building CREATE TABLE statement for the SHOW CREATE TABLE (i.e.
    for_show_create_stmt = true), skip generated invisible primary key
    if system variable 'show_gipk_in_create_table_and_information_schema' is set
    to OFF.
  */
  const bool skip_gipk =
      (for_show_create_stmt &&
       table_has_generated_invisible_primary_key(table) &&
       !thd->variables.show_gipk_in_create_table_and_information_schema);

  Field **first_field = table->field;
  /*
    Generated invisible primary key column is placed at the first position.
    So skip first column when skip_gipk is set.
  */
  assert(!table_has_generated_invisible_primary_key(table) ||
         is_generated_invisible_primary_key_column_name(
             (*first_field)->field_name));
  if (skip_gipk) first_field++;

  for (ptr = first_field; (field = *ptr); ptr++) {
    // Skip hidden system fields.
    if (field->is_hidden_by_system()) continue;

    if (ptr != first_field) packet->append(STRING_WITH_LEN(",\n"));

    packet->append(STRING_WITH_LEN("  "));
    append_identifier(thd, packet, field->field_name,
                      strlen(field->field_name));
    packet->append(' ');
    // check for surprises from the previous call to Field::sql_type()
    if (type.ptr() != tmp)
      type.set(tmp, sizeof(tmp), system_charset_info);
    else
      type.set_charset(system_charset_info);

    field->sql_type(type);
    packet->append(type.ptr(), type.length(), system_charset_info);

    bool column_has_explicit_collation = false;
    /* We may not have a table_obj for schema_tables. */
    if (table_obj)
      column_has_explicit_collation =
          table_obj->get_column(field->field_name)->is_explicit_collation();

    if (field->has_charset()) {
      /*
        For string types dump charset name only if field charset is same as
        table charset or was explicitly assigned.
      */
      if (field->charset() != share->table_charset ||
          column_has_explicit_collation) {
        packet->append(STRING_WITH_LEN(" CHARACTER SET "));
        packet->append(field->charset()->csname);
      }
      /*
        For string types dump collation name only if
        collation is not primary for the given charset
        or was explicitly assigned.
      */
      if (!(field->charset()->state & MY_CS_PRIMARY) ||
          column_has_explicit_collation ||
          (field->charset() == &my_charset_utf8mb4_0900_ai_ci &&
           share->table_charset != &my_charset_utf8mb4_0900_ai_ci)) {
        packet->append(STRING_WITH_LEN(" COLLATE "));
        packet->append(field->charset()->m_coll_name);
      }
    }

    if (field->gcol_info) {
      packet->append(STRING_WITH_LEN(" GENERATED ALWAYS"));
      packet->append(STRING_WITH_LEN(" AS ("));
      char buffer[128];
      String s(buffer, sizeof(buffer), system_charset_info);
      field->gcol_info->print_expr(thd, &s);
      packet->append(s);
      packet->append(STRING_WITH_LEN(")"));
      if (field->stored_in_db)
        packet->append(STRING_WITH_LEN(" STORED"));
      else
        packet->append(STRING_WITH_LEN(" VIRTUAL"));
    }

    if (field->is_flag_set(NOT_NULL_FLAG))
      packet->append(STRING_WITH_LEN(" NOT NULL"));
    else if (field->type() == MYSQL_TYPE_TIMESTAMP) {
      /*
        TIMESTAMP field require explicit NULL flag, because unlike
        all other fields they are treated as NOT NULL by default.
      */
      packet->append(STRING_WITH_LEN(" NULL"));
    }

    if (field->is_flag_set(NOT_SECONDARY_FLAG))
      packet->append(STRING_WITH_LEN(" NOT SECONDARY"));

    if (field->type() == MYSQL_TYPE_GEOMETRY) {
      const Field_geom *field_geom = down_cast<const Field_geom *>(field);
      if (field_geom->get_srid().has_value()) {
        packet->append(STRING_WITH_LEN(" /*!80003 SRID "));
        packet->append_ulonglong(field_geom->get_srid().value());
        packet->append(STRING_WITH_LEN(" */"));
      }
    }
    switch (field->field_storage_type()) {
      case HA_SM_DEFAULT:
        break;
      case HA_SM_DISK:
        packet->append(STRING_WITH_LEN(" /*!50606 STORAGE DISK */"));
        break;
      case HA_SM_MEMORY:
        packet->append(STRING_WITH_LEN(" /*!50606 STORAGE MEMORY */"));
        break;
      default:
        assert(0);
        break;
    }

    switch (field->column_format()) {
      case COLUMN_FORMAT_TYPE_DEFAULT:
        break;
      case COLUMN_FORMAT_TYPE_FIXED:
        packet->append(STRING_WITH_LEN(" /*!50606 COLUMN_FORMAT FIXED */"));
        break;
      case COLUMN_FORMAT_TYPE_DYNAMIC:
        packet->append(STRING_WITH_LEN(" /*!50606 COLUMN_FORMAT DYNAMIC */"));
        break;
      default:
        assert(0);
        break;
    }

    bool as_a_comment{false};
    if (print_default_clause(thd, field, &def_value, true, &as_a_comment)) {
      if (as_a_comment) packet->append(STRING_WITH_LEN(" /* "));
      packet->append(STRING_WITH_LEN(" DEFAULT "));
      packet->append(def_value.ptr(), def_value.length(), system_charset_info);
      if (as_a_comment) packet->append(STRING_WITH_LEN(" */ "));
    }

    if (print_on_update_clause(field, &def_value, false)) {
      packet->append(STRING_WITH_LEN(" "));
      packet->append(def_value);
    }

    if (field->auto_flags & Field::NEXT_NUMBER)
      packet->append(STRING_WITH_LEN(" AUTO_INCREMENT"));

    // Column visibility attribute
    if (field->is_hidden_by_user())
      packet->append(STRING_WITH_LEN(" /*!80023 INVISIBLE */"));

    if (field->comment.length) {
      packet->append(STRING_WITH_LEN(" COMMENT "));
      append_unescaped(packet, field->comment.str, field->comment.length);
    }

    // Storage engine specific json attributes
    if (field->m_engine_attribute.length) {
      packet->append(STRING_WITH_LEN(" /*!80021 ENGINE_ATTRIBUTE "));
      // append escaped JSON
      append_unescaped(packet, field->m_engine_attribute.str,
                       field->m_engine_attribute.length);
      packet->append(STRING_WITH_LEN(" */"));
    }
    if (field->m_secondary_engine_attribute.length) {
      packet->append(STRING_WITH_LEN(" /*!80021 SECONDARY_ENGINE_ATTRIBUTE "));
      // escape JSON
      append_unescaped(packet, field->m_secondary_engine_attribute.str,
                       field->m_secondary_engine_attribute.length);
      packet->append(STRING_WITH_LEN(" */"));
    }
  }

  key_info = table->key_info;
  /*
    Primary key is always at the first position in the keys list. Skip printing
    primary key definition when skip_gipk is set.
  */
  assert(!table_has_generated_invisible_primary_key(table) ||
         ((key_info->user_defined_key_parts == 1) &&
          is_generated_invisible_primary_key_column_name(
              key_info->key_part->field->field_name)));
  if (skip_gipk) key_info++;

  /* Allow update_create_info to update row type */
  create_info.row_type = share->row_type;
  file->update_create_info(&create_info);
  primary_key = share->primary_key;

  for (uint i = skip_gipk ? 1 : 0; i < share->keys; i++, key_info++) {
    KEY_PART_INFO *key_part = key_info->key_part;
    bool found_primary = false;
    packet->append(STRING_WITH_LEN(",\n  "));

    if (i == primary_key && !strcmp(key_info->name, primary_key_name)) {
      found_primary = true;
      /*
        No space at end, because a space will be added after where the
        identifier would go, but that is not added for primary key.
      */
      packet->append(STRING_WITH_LEN("PRIMARY KEY"));
    } else if (key_info->flags & HA_NOSAME)
      packet->append(STRING_WITH_LEN("UNIQUE KEY "));
    else if (key_info->flags & HA_FULLTEXT)
      packet->append(STRING_WITH_LEN("FULLTEXT KEY "));
    else if (key_info->flags & HA_SPATIAL)
      packet->append(STRING_WITH_LEN("SPATIAL KEY "));
    else
      packet->append(STRING_WITH_LEN("KEY "));

    if (!found_primary)
      append_identifier(thd, packet, key_info->name, strlen(key_info->name));

    packet->append(STRING_WITH_LEN(" ("));

    for (uint j = 0; j < key_info->user_defined_key_parts; j++, key_part++) {
      if (j) packet->append(',');

      if (key_part->field) {
        // If this fields represents a functional index, print the expression
        // instead of the column name.
        if (key_part->field->is_field_for_functional_index()) {
          assert(key_part->field->gcol_info);

          StringBuffer<STRING_BUFFER_USUAL_SIZE> s;
          s.set_charset(system_charset_info);
          key_part->field->gcol_info->print_expr(thd, &s);
          packet->append("(");
          packet->append(s);
          packet->append(")");
        } else {
          append_identifier(thd, packet, key_part->field->field_name,
                            strlen(key_part->field->field_name));
        }
      }

      if (key_part->field &&
          (key_part->length !=
               table->field[key_part->fieldnr - 1]->key_length() &&
           !(key_info->flags & (HA_FULLTEXT | HA_SPATIAL)))) {
        packet->append_parenthesized((long)key_part->length /
                                     key_part->field->charset()->mbmaxlen);
      }
      if (key_part->key_part_flag & HA_REVERSE_SORT)
        packet->append(STRING_WITH_LEN(" DESC"));
    }
    packet->append(')');
    store_key_options(thd, packet, table, key_info);
    if (key_info->parser) {
      LEX_CSTRING *parser_name = plugin_name(key_info->parser);
      packet->append(STRING_WITH_LEN(" /*!50100 WITH PARSER "));
      append_identifier(thd, packet, parser_name->str, parser_name->length);
      packet->append(STRING_WITH_LEN(" */ "));
    }
  }

  // Append foreign key constraint definitions to the CREATE TABLE statement.
  print_foreign_key_info(thd, db, table_obj, packet);

  /*
    Append check constraints to the CREATE TABLE statement. All check
    constraints are listed in table check constraint form.
  */
  if (table->table_check_constraint_list != nullptr) {
    for (auto &cc : *table->table_check_constraint_list) {
      packet->append(STRING_WITH_LEN(",\n  CONSTRAINT "));
      append_identifier(thd, packet, cc.name().str, cc.name().length);

      packet->append(STRING_WITH_LEN(" CHECK ("));
      packet->append(cc.expr_str().str, cc.expr_str().length,
                     system_charset_info);
      packet->append(STRING_WITH_LEN(")"));

      /*
        If check constraint is not-enforced then it is listed with the comment
        "NOT ENFORCED".
      */
      if (!cc.is_enforced()) {
        packet->append(STRING_WITH_LEN(" /*!80016 NOT ENFORCED */"));
      }
    }
  }

  packet->append(STRING_WITH_LEN("\n)"));

  /**
    Append START TRANSACTION for CREATE SELECT on SE supporting atomic DDL.
    This is done only while binlogging CREATE TABLE AS SELECT.
  */
  if (!thd->lex->query_block->field_list_is_empty() &&
      (create_info_arg->db_type->flags & HTON_SUPPORTS_ATOMIC_DDL)) {
    packet->append(STRING_WITH_LEN(" START TRANSACTION"));
  }

  bool show_tablespace = false;
  if (!foreign_db_mode) {
    show_table_options = true;

    // Show tablespace name only if it is explicitly provided by user.
    if (share->tmp_table) {
      // Innodb allows temporary tables in be in system temporary tablespace.
      show_tablespace = share->tablespace;
    } else if (share->tablespace && table_obj) {
      show_tablespace = table_obj->is_explicit_tablespace();
    }

    /* TABLESPACE and STORAGE */
    if (show_tablespace || share->default_storage_media != HA_SM_DEFAULT) {
      packet->append(STRING_WITH_LEN(" /*!50100"));
      if (show_tablespace) {
        packet->append(STRING_WITH_LEN(" TABLESPACE "));
        append_identifier(thd, packet, share->tablespace,
                          strlen(share->tablespace));
      }

      if (share->default_storage_media == HA_SM_DISK)
        packet->append(STRING_WITH_LEN(" STORAGE DISK"));
      if (share->default_storage_media == HA_SM_MEMORY)
        packet->append(STRING_WITH_LEN(" STORAGE MEMORY"));

      packet->append(STRING_WITH_LEN(" */"));
    }

    /* Get Autoextend_size attribute for file_per_table tablespaces. */

    ulonglong autoextend_size{};

    if (create_info_arg != nullptr) {
      if ((create_info_arg->used_fields & HA_CREATE_USED_AUTOEXTEND_SIZE) !=
          0) {
        autoextend_size =
            create_info_arg->m_implicit_tablespace_autoextend_size;
      }
    } else if (!share->tmp_table && table_obj &&
               table_obj->engine() == "InnoDB") {
      /* Get the AUTOEXTEND_SIZE if the tablespace is an implicit tablespace. */
      dd::get_implicit_tablespace_options(thd, table_obj, &autoextend_size);
    }

    /* Print autoextend_size attribute if it is set to a non-zero value */
    if (autoextend_size > 0) {
      char buf[std::numeric_limits<decltype(autoextend_size)>::digits10 + 2];
      const int len = snprintf(buf, sizeof(buf), "%llu", autoextend_size);
      assert(len < static_cast<int>(sizeof(buf)));
      packet->append(STRING_WITH_LEN(" /*!80023 AUTOEXTEND_SIZE="));
      packet->append(buf, len);
      packet->append(STRING_WITH_LEN(" */"));
    }

    /*
      IF   check_create_info
      THEN add ENGINE only if it was used when creating the table
    */
    if (!create_info_arg ||
        (create_info_arg->used_fields & HA_CREATE_USED_ENGINE)) {
      packet->append(STRING_WITH_LEN(" ENGINE="));
      /*
        TODO: Replace this if with the else branch. Not done yet since
        NDB handlerton says "ndbcluster" and ha_ndbcluster says "NDBCLUSTER".
      */
      if (table->part_info) {
        packet->append(ha_resolve_storage_engine_name(
            table->part_info->default_engine_type));
      } else {
        packet->append(file->table_type());
      }
    }

    /*
      Add AUTO_INCREMENT=... if there is an AUTO_INCREMENT column,
      and NEXT_ID > 1 (the default).  We must not print the clause
      for engines that do not support this as it would break the
      import of dumps, but as of this writing, the test for whether
      AUTO_INCREMENT columns are allowed and whether AUTO_INCREMENT=...
      is supported is identical, !(file->table_flags() & HA_NO_AUTO_INCREMENT))
      Because of that, we do not explicitly test for the feature,
      but may extrapolate its existence from that of an AUTO_INCREMENT column.

      If table has a generated invisible primary key and skip_gipk is set,
      then we should not print the AUTO_INCREMENT as AUTO_INCREMENT column
      (generated invisible primary key column) is skipped with this setting.
    */

    if (create_info.auto_increment_value > 1 && !skip_gipk) {
      char *end;
      packet->append(STRING_WITH_LEN(" AUTO_INCREMENT="));
      end = longlong10_to_str(create_info.auto_increment_value, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }

    if (share->table_charset) {
      /*
        IF   check_create_info
        THEN add DEFAULT CHARSET only if it was used when creating the table
      */
      if (!create_info_arg ||
          (create_info_arg->used_fields & HA_CREATE_USED_DEFAULT_CHARSET)) {
        packet->append(STRING_WITH_LEN(" DEFAULT CHARSET="));
        packet->append(share->table_charset->csname);
        if (!(share->table_charset->state & MY_CS_PRIMARY) ||
            share->table_charset == &my_charset_utf8mb4_0900_ai_ci) {
          packet->append(STRING_WITH_LEN(" COLLATE="));
          packet->append(table->s->table_charset->m_coll_name);
        }
      }
    }

    if (share->min_rows) {
      char *end;
      packet->append(STRING_WITH_LEN(" MIN_ROWS="));
      end = longlong10_to_str(share->min_rows, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }

    if (share->max_rows && !table_list->schema_table) {
      char *end;
      packet->append(STRING_WITH_LEN(" MAX_ROWS="));
      end = longlong10_to_str(share->max_rows, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }

    if (share->avg_row_length) {
      char *end;
      packet->append(STRING_WITH_LEN(" AVG_ROW_LENGTH="));
      end = longlong10_to_str(share->avg_row_length, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }

    if (share->db_create_options & HA_OPTION_PACK_KEYS)
      packet->append(STRING_WITH_LEN(" PACK_KEYS=1"));
    if (share->db_create_options & HA_OPTION_NO_PACK_KEYS)
      packet->append(STRING_WITH_LEN(" PACK_KEYS=0"));
    if (share->db_create_options & HA_OPTION_STATS_PERSISTENT)
      packet->append(STRING_WITH_LEN(" STATS_PERSISTENT=1"));
    if (share->db_create_options & HA_OPTION_NO_STATS_PERSISTENT)
      packet->append(STRING_WITH_LEN(" STATS_PERSISTENT=0"));
    if (share->stats_auto_recalc == HA_STATS_AUTO_RECALC_ON)
      packet->append(STRING_WITH_LEN(" STATS_AUTO_RECALC=1"));
    else if (share->stats_auto_recalc == HA_STATS_AUTO_RECALC_OFF)
      packet->append(STRING_WITH_LEN(" STATS_AUTO_RECALC=0"));
    if (share->stats_sample_pages != 0) {
      char *end;
      packet->append(STRING_WITH_LEN(" STATS_SAMPLE_PAGES="));
      end = longlong10_to_str(share->stats_sample_pages, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }
    /* We use CHECKSUM, instead of TABLE_CHECKSUM, for backward compatibility */
    if (share->db_create_options & HA_OPTION_CHECKSUM)
      packet->append(STRING_WITH_LEN(" CHECKSUM=1"));
    if (share->db_create_options & HA_OPTION_DELAY_KEY_WRITE)
      packet->append(STRING_WITH_LEN(" DELAY_KEY_WRITE=1"));

    /*
      If 'show_create_table_verbosity' is enabled, the row format would
      be displayed in the output of SHOW CREATE TABLE even if default
      row format is used. Otherwise only the explicitly mentioned
      row format would be displayed.
    */
    if (thd->variables.show_create_table_verbosity) {
      packet->append(STRING_WITH_LEN(" ROW_FORMAT="));
      packet->append(ha_row_type[(uint)share->real_row_type]);
    } else if (create_info.row_type != ROW_TYPE_DEFAULT) {
      packet->append(STRING_WITH_LEN(" ROW_FORMAT="));
      packet->append(ha_row_type[(uint)create_info.row_type]);
    }
    if (table->s->key_block_size) {
      char *end;
      packet->append(STRING_WITH_LEN(" KEY_BLOCK_SIZE="));
      end = longlong10_to_str(table->s->key_block_size, buff, 10);
      packet->append(buff, (uint)(end - buff));
    }
    if (table->s->compress.length) {
      packet->append(STRING_WITH_LEN(" COMPRESSION="));
      append_unescaped(packet, share->compress.str, share->compress.length);
    }
    bool print_encryption = false;
    if (should_print_encryption_clause(thd, share, &print_encryption))
      return true;
    if (print_encryption) {
      /*
        Add versioned comment when there is TABLESPACE clause displayed and
        the table uses general tablespace.
      */
      bool uses_general_tablespace = false;
      if (table_obj)
        uses_general_tablespace =
            show_tablespace && dd::uses_general_tablespace(*table_obj);
      if (uses_general_tablespace) packet->append(STRING_WITH_LEN(" /*!80016"));

      packet->append(STRING_WITH_LEN(" ENCRYPTION="));
      if (share->encrypt_type.length) {
        append_unescaped(packet, share->encrypt_type.str,
                         share->encrypt_type.length);
      } else {
        /*
          We print ENCRYPTION='N' only in case user did not explicitly
          provide ENCRYPTION clause and schema has default_encryption 'Y'.
          In other words, if there is no ENCRYPTION clause supplied, then
          it is always unencrypted table. Server always maintains
          ENCRYPTION clause for encrypted tables, even if user did not
          supply the clause explicitly.
        */
        packet->append(STRING_WITH_LEN("\'N\'"));
      }

      if (uses_general_tablespace) packet->append(STRING_WITH_LEN(" */"));
    }
    table->file->append_create_info(packet);
    if (share->comment.length) {
      packet->append(STRING_WITH_LEN(" COMMENT="));
      append_unescaped(packet, share->comment.str, share->comment.length);
    }
    if (share->connect_string.length) {
      packet->append(STRING_WITH_LEN(" CONNECTION="));
      append_unescaped(packet, share->connect_string.str,
                       share->connect_string.length);
    }
    if (share->has_secondary_engine() &&
        !thd->variables.show_create_table_skip_secondary_engine) {
      packet->append(" SECONDARY_ENGINE=");
      packet->append(share->secondary_engine.str,
                     share->secondary_engine.length);
    }

    if (share->engine_attribute.length) {
      packet->append(STRING_WITH_LEN(" /*!80021 ENGINE_ATTRIBUTE="));

      if (for_show_create_stmt &&
          check_table_access(thd, CREATE_ACL, table_list, false, 1, true) &&
          check_table_access(thd, ALTER_ACL, table_list, false, 1, true)) {
        String rlb;
        String original_query_str(share->engine_attribute.str,
                                  share->engine_attribute.length,
                                  system_charset_info);
        redact_par_url(original_query_str, rlb);

        append_unescaped(packet, rlb.ptr(), rlb.length());
        // inform users without privileges that the result of SHOW CREATE
        // TABLE is redacted. Useful for the case of redacted mysqldump
        // result.
        push_warning_printf(
            thd, Sql_condition::SL_WARNING, ER_WARN_REDACTED_PRIVILEGES,
            ER_THD(thd, ER_WARN_REDACTED_PRIVILEGES), share->table_name.str);

      } else {
        append_unescaped(packet, share->engine_attribute.str,
                         share->engine_attribute.length);
      }
      packet->append(STRING_WITH_LEN(" */"));
    }
    if (share->secondary_engine_attribute.length) {
      packet->append(STRING_WITH_LEN(" /*!80021 SECONDARY_ENGINE_ATTRIBUTE="));
      // escape JSON
      append_unescaped(packet, share->secondary_engine_attribute.str,
                       share->secondary_engine_attribute.length);
      packet->append(STRING_WITH_LEN(" */"));
    }
    append_directory(thd, packet, "DATA", create_info.data_file_name);
    append_directory(thd, packet, "INDEX", create_info.index_file_name);
  }
  {
    if (table->part_info &&
        !(table->s->db_type()->partition_flags &&
          (table->s->db_type()->partition_flags() & HA_USE_AUTO_PARTITION) &&
          table->part_info->is_auto_partitioned)) {
      /*
        Partition syntax for CREATE TABLE is at the end of the syntax.
      */
      uint part_syntax_len;
      char *part_syntax;
      String comment_start;
      table->part_info->set_show_version_string(&comment_start);
      if ((part_syntax = generate_partition_syntax(
               table->part_info, &part_syntax_len, false, show_table_options,
               true,  // For proper quoting.
               comment_start.c_ptr()))) {
        packet->append(comment_start);
        if (packet->append(part_syntax, part_syntax_len) ||
            packet->append(STRING_WITH_LEN(" */")))
          error = true;
        my_free(part_syntax);
      }
    }
  }
  return error;
}