static int open_binary_frm()

in sql/table.cc [1053:2358]


static int open_binary_frm(THD *thd, TABLE_SHARE *share, uchar *head,
                           File file)
{
  int error, errarg= 0;
  uint new_frm_ver, field_pack_length, new_field_pack_flag;
  uint interval_count, interval_parts, read_length, int_length;
  uint db_create_options, keys, key_parts, n_length;
  uint key_info_length, com_length, null_bit_pos;
  uint extra_rec_buf_length;
  uint i,j;
  bool use_extended_sk;   // Supported extending of secondary keys with PK parts
  bool use_hash;
  char *keynames, *names, *comment_pos;
  uchar forminfo[288];
  uchar *record;
  uchar *disk_buff, *strpos, *null_flags, *null_pos;
  ulong pos, record_offset, *rec_per_key, rec_buff_length;
  handler *handler_file= 0;
  KEY	*keyinfo;
  KEY_PART_INFO *key_part;
  SQL_CRYPT *crypted=0;
  Field  **field_ptr, *reg_field;
  const char **interval_array;
  enum legacy_db_type legacy_db_type;
  my_bitmap_map *bitmaps;
  uchar *extra_segment_buff= 0;
  const uint format_section_header_size= 8;
  uchar *format_section_fields= 0;
  uint document_path_key_parts = 0;
  DBUG_ENTER("open_binary_frm");

  new_field_pack_flag= head[27];
  new_frm_ver= (head[2] - FRM_VER);
  field_pack_length= new_frm_ver < 2 ? 11 : 17;
  disk_buff= 0;

  error= 3;
  /* Position of the form in the form file. */
  if (!(pos= get_form_pos(file, head)))
    goto err;                                   /* purecov: inspected */

  mysql_file_seek(file,pos,MY_SEEK_SET,MYF(0));
  if (mysql_file_read(file, forminfo,288,MYF(MY_NABP)))
    goto err;
  share->frm_version= head[2];
  /*
    Check if .frm file created by MySQL 5.0. In this case we want to
    display CHAR fields as CHAR and not as VARCHAR.
    We do it this way as we want to keep the old frm version to enable
    MySQL 4.1 to read these files.
  */
  if (share->frm_version == FRM_VER_TRUE_VARCHAR -1 && head[33] == 5)
    share->frm_version= FRM_VER_TRUE_VARCHAR;

#ifdef WITH_PARTITION_STORAGE_ENGINE
  if (*(head+61) &&
      !(share->default_part_db_type= 
        ha_checktype(thd, (enum legacy_db_type) (uint) *(head+61), 1, 0)))
    goto err;
  DBUG_PRINT("info", ("default_part_db_type = %u", head[61]));
#endif
  legacy_db_type= (enum legacy_db_type) (uint) *(head+3);
  DBUG_ASSERT(share->db_plugin == NULL);
  /*
    if the storage engine is dynamic, no point in resolving it by its
    dynamically allocated legacy_db_type. We will resolve it later by name.
  */
  if (legacy_db_type > DB_TYPE_UNKNOWN && 
      legacy_db_type < DB_TYPE_FIRST_DYNAMIC)
    share->db_plugin= ha_lock_engine(NULL, 
                                     ha_checktype(thd, legacy_db_type, 0, 0));
  share->db_create_options= db_create_options= uint2korr(head+30);
  share->db_options_in_use= share->db_create_options;
  share->mysql_version= uint4korr(head+51);
  share->null_field_first= 0;
  if (!head[32])				// New frm file in 3.23
  {
    share->avg_row_length= uint4korr(head+34);
    share->row_type= (row_type) head[40];
    share->table_charset= get_charset((((uint) head[41]) << 8) + 
                                        (uint) head[38],MYF(0));
    share->null_field_first= 1;
    share->stats_sample_pages= uint2korr(head+42);
    share->stats_auto_recalc= static_cast<enum_stats_auto_recalc>(head[44]);
    share->rbr_column_names = head[45] & 0x80 ? true : false;
  }
  if (!share->table_charset)
  {
    /* unknown charset in head[38] or pre-3.23 frm */
    if (use_mb(default_charset_info))
    {
      /* Warn that we may be changing the size of character columns */
      sql_print_warning("'%s' had no or invalid character set, "
                        "and default character set is multi-byte, "
                        "so character column sizes may have changed",
                        share->path.str);
    }
    share->table_charset= default_charset_info;
  }
  share->db_record_offset= 1;
  /* Set temporarily a good value for db_low_byte_first */
  share->db_low_byte_first= MY_TEST(legacy_db_type != DB_TYPE_ISAM);
  error=4;
  share->max_rows= uint4korr(head+18);
  share->min_rows= uint4korr(head+22);

  /* Read keyinformation */
  key_info_length= (uint) uint2korr(head+28);
  mysql_file_seek(file, (ulong) uint2korr(head+6), MY_SEEK_SET, MYF(0));
  if (read_string(file,(uchar**) &disk_buff,key_info_length))
    goto err;                                   /* purecov: inspected */
  if (disk_buff[0] & 0x80)
  {
    share->keys=      keys=      (disk_buff[1] << 7) | (disk_buff[0] & 0x7f);
    share->key_parts= key_parts= uint2korr(disk_buff+2);
  }
  else
  {
    share->keys=      keys=      disk_buff[0];
    share->key_parts= key_parts= disk_buff[1];
  }
  share->keys_for_keyread.init(0);
  share->keys_in_use.init(keys);
  share->document_keys.init(0);

  strpos=disk_buff+6;  

  use_extended_sk=
    ha_check_storage_engine_flag(share->db_type(),
                                 HTON_SUPPORTS_EXTENDED_KEYS);

  uint total_key_parts;
  if (use_extended_sk)
  {
    uint primary_key_parts= keys ?
      (new_frm_ver >= 3) ? (uint) strpos[4] : (uint) strpos[3] : 0;
    total_key_parts= key_parts + primary_key_parts * (keys - 1);
  }
  else
    total_key_parts= key_parts;
  n_length= keys * sizeof(KEY) + total_key_parts * sizeof(KEY_PART_INFO);

  if (!(keyinfo = (KEY*) alloc_root(&share->mem_root,
				    n_length + uint2korr(disk_buff+4))))
    goto err;                                   /* purecov: inspected */
  memset(keyinfo, 0, n_length);
  share->key_info= keyinfo;
  key_part= reinterpret_cast<KEY_PART_INFO*>(keyinfo+keys);

  if (!(rec_per_key= (ulong*) alloc_root(&share->mem_root,
                                         sizeof(ulong) * total_key_parts)))
    goto err;

  for (i=0 ; i < keys ; i++, keyinfo++)
  {
    keyinfo->table= 0;                           // Updated in open_frm
    if (new_frm_ver >= 3)
    {
      keyinfo->flags=	   (uint) uint2korr(strpos) ^ HA_NOSAME;
      keyinfo->key_length= (uint) uint2korr(strpos+2);
      keyinfo->user_defined_key_parts= (uint) strpos[4];
      keyinfo->algorithm=  (enum ha_key_alg) strpos[5];
      keyinfo->block_size= uint2korr(strpos+6);
      strpos+=8;
    }
    else
    {
      keyinfo->flags=	 ((uint) strpos[0]) ^ HA_NOSAME;
      keyinfo->key_length= (uint) uint2korr(strpos+1);
      keyinfo->user_defined_key_parts= (uint) strpos[3];
      keyinfo->algorithm= HA_KEY_ALG_UNDEF;
      strpos+=4;
    }

    keyinfo->key_part=	 key_part;
    keyinfo->rec_per_key= rec_per_key;
    for (j=keyinfo->user_defined_key_parts ; j-- ; key_part++)
    {
      *rec_per_key++=0;
      key_part->fieldnr=	(uint16) (uint2korr(strpos) & FIELD_NR_MASK);
      key_part->offset= (uint) uint2korr(strpos+2)-1;
      key_part->key_type=	(uint) uint2korr(strpos+5);
      // key_part->field=	(Field*) 0;	// Will be fixed later
      if (new_frm_ver >= 1)
      {
	key_part->key_part_flag= *(strpos+4);
	key_part->length=	(uint) uint2korr(strpos+7);
	strpos+=9;

        /* Document path key part */
        if (f_is_document(key_part->key_type))
          document_path_key_parts++;
      }
      else
      {
	key_part->length=	*(strpos+4);
	key_part->key_part_flag=0;
	if (key_part->length > 128)
	{
	  key_part->length&=127;		/* purecov: inspected */
	  key_part->key_part_flag=HA_REVERSE_SORT; /* purecov: inspected */
	}
	strpos+=7;
      }
      key_part->store_length=key_part->length;
    }
    /*
      Add primary key parts if engine supports primary key extension for
      secondary keys. Here we add unique first key parts to the end of
      secondary key parts array and increase actual number of key parts.
      Note that primary key is always first if exists. Later if there is no
      primary key in the table then number of actual keys parts is set to
      user defined key parts.
    */
    keyinfo->actual_key_parts= keyinfo->user_defined_key_parts;
    keyinfo->actual_flags= keyinfo->flags;
    if (use_extended_sk && i && !(keyinfo->flags & HA_NOSAME))
    {
      const uint primary_key_parts= share->key_info->user_defined_key_parts;
      keyinfo->unused_key_parts= primary_key_parts;
      key_part+= primary_key_parts;
      rec_per_key+= primary_key_parts;
      share->key_parts+= primary_key_parts;
    }
  }
  keynames=(char*) key_part;
  strpos+= (strmov(keynames, (char *) strpos) - keynames)+1;

  /* read and parse document path fields for virutal key parts */
  if (document_path_key_parts > 0 && new_frm_ver >= 3)
  {
    /* preparing the key trie since we have some document path keys */
    share->document_key_trie = (Document_key_trie*)
      alloc_root(&share->mem_root, sizeof(Document_key_trie));
    memset(share->document_key_trie, 0, sizeof(Document_key_trie));

    keyinfo = share->key_info;
    for (i=0; i < keys; i++, keyinfo++)
    {
      key_part = keyinfo->key_part;
      for (j=0; j < keyinfo->user_defined_key_parts; j++, key_part++)
      {
        if (!f_is_document(key_part->key_type))
          continue;

        keyinfo->contains_document_key_part = true;

        /* the document path key part structure */
        key_part->document_path_key_part = (DOCUMENT_PATH_KEY_PART_INFO *)
          alloc_root(&share->mem_root, sizeof(DOCUMENT_PATH_KEY_PART_INFO));

        if (!key_part->document_path_key_part)
          goto err;

        memset(key_part->document_path_key_part,
               0, sizeof(DOCUMENT_PATH_KEY_PART_INFO));

        DBUG_ASSERT(*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR);
        strpos++;

        /* the type of the document path key part */
        enum_field_types type;
        switch (*strpos++)
        {
        case 'L':
          type = MYSQL_TYPE_LONGLONG;
          break;
        case 'D':
          type = MYSQL_TYPE_DOUBLE;
          break;
        case 'T':
          type = MYSQL_TYPE_TINY;
          break;
        case 'S':
          type = MYSQL_TYPE_STRING;
          break;
        default:
          type = MYSQL_TYPE_NULL;
          DBUG_ASSERT(0);
        }
        key_part->document_path_key_part->type = type;

        DBUG_ASSERT(*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR);
        strpos++;

        /* the original key part length */
        key_part->document_path_key_part->length=
          (uint) uint2korr(strpos);
        DBUG_ASSERT(key_part->document_path_key_part->length > 0);
        strpos+=2;

        DBUG_ASSERT(*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR);
        strpos++;

        /* the number of document paths for this document path key part */
        key_part->document_path_key_part->number_of_names=
          (uint) uint2korr(strpos);
        DBUG_ASSERT(key_part->document_path_key_part->number_of_names >= 2);
        strpos+=2;

        uint names_size =
          sizeof(char *) * key_part->document_path_key_part->number_of_names;

        key_part->document_path_key_part->names =
          (char **)alloc_root(&share->mem_root, names_size);

        if (!key_part->document_path_key_part->names)
          goto err;

        memset(key_part->document_path_key_part->names, 0, names_size);

        DBUG_ASSERT(*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR);
        strpos++;

        Document_key_trie* trie = share->document_key_trie;
        uint idx = 0;
        for (;idx < key_part->document_path_key_part->number_of_names;idx++)
        {
          char *name_start = (char *)strpos;
          while (*((uchar *)strpos) != (uchar) NAMES_SEP_CHAR)
            strpos++;

          uint name_len = ((char *)strpos - name_start);
          key_part->document_path_key_part->names[idx]=
            (char *)alloc_root(&share->mem_root, name_len + 1);

          if (!key_part->document_path_key_part->names[idx])
            goto err;

          memcpy(key_part->document_path_key_part->names[idx],
                 name_start, name_len);
          *(key_part->document_path_key_part->names[idx] + name_len)= '\0';

          /* recursively insert the key part names into the trie */
          trie = trie->insert(key_part->document_path_key_part->names[idx],
              &share->mem_root);
          DBUG_ASSERT(trie);

          /* the NAMES_SEP_CHAR for this document path */
          DBUG_ASSERT(*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR);
          strpos++;

          /* the next char is still NAES_SEP_CHAR so we have seen two
             consecutive NAMES_SEP_CHAR so the virutal key for this
             document path key part is ended
          */
          if (*((uchar *)strpos) == (uchar) NAMES_SEP_CHAR)
          {
            strpos++;
            break;
          }
        }
        DBUG_ASSERT(idx ==
                    key_part->document_path_key_part->number_of_names - 1);

        trie->key_type = key_part->document_path_key_part->type;
        trie->key_length = key_part->length;
        /* we have extracted all the document paths of the document path key
           parts for this key so now we can restore the original key_type */
        DBUG_ASSERT(f_is_document(key_part->key_type));

        document_path_key_parts--;

      } /* completed iterating all the key parts for a given key*/

      if (keyinfo->contains_document_key_part)
        share->document_keys.set_bit(i);

    } /* completed iterating all the keys */

    DBUG_ASSERT(document_path_key_parts == 0);
  }
  else
    share->document_key_trie = nullptr;

  //reading index comments
  for (keyinfo= share->key_info, i=0; i < keys; i++, keyinfo++)
  {
    if (keyinfo->flags & HA_USES_COMMENT)
    {
      keyinfo->comment.length= uint2korr(strpos);
      keyinfo->comment.str= strmake_root(&share->mem_root, (char*) strpos+2,
                                         keyinfo->comment.length);
      strpos+= 2 + keyinfo->comment.length;
    } 
    DBUG_ASSERT(MY_TEST(keyinfo->flags & HA_USES_COMMENT) ==
               (keyinfo->comment.length > 0));
  }

  share->reclength = uint2korr((head+16));
  if (*(head+26) == 1)
    share->system= 1;				/* one-record-database */
#ifdef HAVE_CRYPTED_FRM
  else if (*(head+26) == 2)
  {
    crypted= get_crypt_for_frm();
    share->crypted= 1;
  }
#endif

  record_offset= (ulong) (uint2korr(head+6)+
                          ((uint2korr(head+14) == 0xffff ?
                            uint4korr(head+47) : uint2korr(head+14))));
 
  if ((n_length= uint4korr(head+55)))
  {
    /* Read extra data segment */
    uchar *next_chunk, *buff_end;
    DBUG_PRINT("info", ("extra segment size is %u bytes", n_length));
    if (!(extra_segment_buff= (uchar*) my_malloc(n_length, MYF(MY_WME))))
      goto err;
    next_chunk= extra_segment_buff;
    if (mysql_file_pread(file, extra_segment_buff,
                         n_length, record_offset + share->reclength,
                         MYF(MY_NABP)))
    {
      goto err;
    }
    share->connect_string.length= uint2korr(next_chunk);
    if (!(share->connect_string.str= strmake_root(&share->mem_root,
                                                  (char*) next_chunk + 2,
                                                  share->connect_string.
                                                  length)))
    {
      goto err;
    }
    next_chunk+= share->connect_string.length + 2;
    buff_end= extra_segment_buff + n_length;
    if (next_chunk + 2 < buff_end)
    {
      uint str_db_type_length= uint2korr(next_chunk);
      LEX_STRING name;
      name.str= (char*) next_chunk + 2;
      name.length= str_db_type_length;

      plugin_ref tmp_plugin= ha_resolve_by_name(thd, &name, FALSE);
      if (tmp_plugin != NULL && !plugin_equals(tmp_plugin, share->db_plugin))
      {
        if (legacy_db_type > DB_TYPE_UNKNOWN &&
            legacy_db_type < DB_TYPE_FIRST_DYNAMIC &&
            legacy_db_type != ha_legacy_type(
                plugin_data(tmp_plugin, handlerton *)))
        {
          /* bad file, legacy_db_type did not match the name */
          goto err;
        }
        /*
          tmp_plugin is locked with a local lock.
          we unlock the old value of share->db_plugin before
          replacing it with a globally locked version of tmp_plugin
        */
        plugin_unlock(NULL, share->db_plugin);
        share->db_plugin= my_plugin_lock(NULL, &tmp_plugin);
        DBUG_PRINT("info", ("setting dbtype to '%.*s' (%d)",
                            str_db_type_length, next_chunk + 2,
                            ha_legacy_type(share->db_type())));
      }
#ifdef WITH_PARTITION_STORAGE_ENGINE
      else if (str_db_type_length == 9 &&
               !strncmp((char *) next_chunk + 2, "partition", 9))
      {
        /*
          Use partition handler
          tmp_plugin is locked with a local lock.
          we unlock the old value of share->db_plugin before
          replacing it with a globally locked version of tmp_plugin
        */
        /* Check if the partitioning engine is ready */
        if (!plugin_is_ready(&name, MYSQL_STORAGE_ENGINE_PLUGIN))
        {
          error= 8;
          my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0),
                   "--skip-partition", "");
          goto err;
        }
        plugin_unlock(NULL, share->db_plugin);
        share->db_plugin= ha_lock_engine(NULL, partition_hton);
        DBUG_PRINT("info", ("setting dbtype to '%.*s' (%d)",
                            str_db_type_length, next_chunk + 2,
                            ha_legacy_type(share->db_type())));
      }
#endif
      else if (!tmp_plugin)
      {
        /* purecov: begin inspected */
        error= 8;
        name.str[name.length]=0;
        my_error(ER_UNKNOWN_STORAGE_ENGINE, MYF(0), name.str);
        goto err;
        /* purecov: end */
      }
      next_chunk+= str_db_type_length + 2;
    }
    if (next_chunk + 5 < buff_end)
    {
      uint32 partition_info_str_len = uint4korr(next_chunk);
#ifdef WITH_PARTITION_STORAGE_ENGINE
      if ((share->partition_info_buffer_size=
             share->partition_info_str_len= partition_info_str_len))
      {
        if (!(share->partition_info_str= (char*)
              memdup_root(&share->mem_root, next_chunk + 4,
                          partition_info_str_len + 1)))
        {
          goto err;
        }
      }
#else
      if (partition_info_str_len)
      {
        DBUG_PRINT("info", ("WITH_PARTITION_STORAGE_ENGINE is not defined"));
        goto err;
      }
#endif
      next_chunk+= 5 + partition_info_str_len;
    }
#if MYSQL_VERSION_ID < 50200
    if (share->mysql_version >= 50106 && share->mysql_version <= 50109)
    {
      /*
         Partition state array was here in version 5.1.6 to 5.1.9, this code
         makes it possible to load a 5.1.6 table in later versions. Can most
         likely be removed at some point in time. Will only be used for
         upgrades within 5.1 series of versions. Upgrade to 5.2 can only be
         done from newer 5.1 versions.
      */
      next_chunk+= 4;
    }
    else
#endif
    if (share->mysql_version >= 50110 && next_chunk < buff_end)
    {
      /* New auto_partitioned indicator introduced in 5.1.11 */
#ifdef WITH_PARTITION_STORAGE_ENGINE
      share->auto_partitioned= *next_chunk;
#endif
      next_chunk++;
    }
    keyinfo= share->key_info;
    for (i= 0; i < keys; i++, keyinfo++)
    {
      if (keyinfo->flags & HA_USES_PARSER)
      {
        LEX_STRING parser_name;
        if (next_chunk >= buff_end)
        {
          DBUG_PRINT("error",
                     ("fulltext key uses parser that is not defined in .frm"));
          goto err;
        }
        parser_name.str= (char*) next_chunk;
        parser_name.length= strlen((char*) next_chunk);
        next_chunk+= parser_name.length + 1;
        keyinfo->parser= my_plugin_lock_by_name(NULL, &parser_name,
                                                MYSQL_FTPARSER_PLUGIN);
        if (! keyinfo->parser)
        {
          my_error(ER_PLUGIN_IS_NOT_LOADED, MYF(0), parser_name.str);
          goto err;
        }
      }
    }
    if (forminfo[46] == (uchar)255)
    {
      //reading long table comment
      if (next_chunk + 2 > buff_end)
      {
          DBUG_PRINT("error",
                     ("long table comment is not defined in .frm"));
          goto err;
      }
      share->comment.length = uint2korr(next_chunk);
      if (! (share->comment.str= strmake_root(&share->mem_root,
             (char*)next_chunk + 2, share->comment.length)))
      {
          goto err;
      }
      next_chunk+= 2 + share->comment.length;
    }

    if (next_chunk + format_section_header_size < buff_end)
    {
      /*
        New extra data segment called "format section" with additional
        table and column properties introduced by MySQL Cluster
        based on 5.1.20

        Table properties:
        TABLESPACE <ts> and STORAGE [DISK|MEMORY]

        Column properties:
        COLUMN_FORMAT [DYNAMIC|FIXED] and STORAGE [DISK|MEMORY]
      */
      DBUG_PRINT("info", ("Found format section"));

      /* header */
      const uint format_section_length= uint2korr(next_chunk);
      const uint format_section_flags= uint4korr(next_chunk+2);
      /* 2 bytes unused */

      if (next_chunk + format_section_length > buff_end)
      {
        DBUG_PRINT("error", ("format section length too long: %u",
                             format_section_length));
        goto err;
      }
      DBUG_PRINT("info", ("format_section_length: %u, format_section_flags: %u",
                          format_section_length, format_section_flags));

      share->default_storage_media=
        (enum ha_storage_media) (format_section_flags & 0x7);

      /* tablespace */
      const char *tablespace=
        (const char*)next_chunk + format_section_header_size;
      const uint tablespace_length= strlen(tablespace);
      if (tablespace_length &&
          !(share->tablespace= strmake_root(&share->mem_root,
                                            tablespace, tablespace_length+1)))
      {
        goto err;
      }
      DBUG_PRINT("info", ("tablespace: '%s'",
                          share->tablespace ? share->tablespace : "<null>"));

      /* pointer to format section for fields */
      format_section_fields=
        next_chunk + format_section_header_size + tablespace_length + 1;

      next_chunk+= format_section_length;
    }
  }
  share->key_block_size= uint2korr(head+62);

  error=4;
  extra_rec_buf_length= uint2korr(head+59);
  rec_buff_length= ALIGN_SIZE(share->reclength + 1 + extra_rec_buf_length);
  share->rec_buff_length= rec_buff_length;
  if (!(record= (uchar *) alloc_root(&share->mem_root,
                                     rec_buff_length)))
    goto err;                                   /* purecov: inspected */
  share->default_values= record;
  if (mysql_file_pread(file, record, (size_t) share->reclength,
                       record_offset, MYF(MY_NABP)))
    goto err;                                   /* purecov: inspected */

  mysql_file_seek(file, pos+288, MY_SEEK_SET, MYF(0));
#ifdef HAVE_CRYPTED_FRM
  if (crypted)
  {
    crypted->decode((char*) forminfo+256,288-256);
    if (sint2korr(forminfo+284) != 0)		// Should be 0
      goto err;                                 // Wrong password
  }
#endif

  share->fields= uint2korr(forminfo+258);
  pos= uint2korr(forminfo+260);   /* Length of all screens */
  n_length= uint2korr(forminfo+268);
  interval_count= uint2korr(forminfo+270);
  interval_parts= uint2korr(forminfo+272);
  int_length= uint2korr(forminfo+274);
  share->null_fields= uint2korr(forminfo+282);
  com_length= uint2korr(forminfo+284);
  if (forminfo[46] != (uchar)255)
  {
    share->comment.length=  (int) (forminfo[46]);
    share->comment.str= strmake_root(&share->mem_root, (char*) forminfo+47,
                                     share->comment.length);
  }

  DBUG_PRINT("info",("i_count: %d  i_parts: %d  index: %d  n_length: %d  int_length: %d  com_length: %d", interval_count,interval_parts, share->keys,n_length,int_length, com_length));

  if (!(field_ptr = (Field **)
	alloc_root(&share->mem_root,
		   (uint) ((share->fields+1)*sizeof(Field*)+
			   interval_count*sizeof(TYPELIB)+
			   (share->fields+interval_parts+
			    keys+3)*sizeof(char *)+
			   (n_length+int_length+com_length)))))
    goto err;                                   /* purecov: inspected */

  share->field= field_ptr;
  read_length=(uint) (share->fields * field_pack_length +
		      pos+ (uint) (n_length+int_length+com_length));
  if (read_string(file,(uchar**) &disk_buff,read_length))
    goto err;                                   /* purecov: inspected */
#ifdef HAVE_CRYPTED_FRM
  if (crypted)
  {
    crypted->decode((char*) disk_buff,read_length);
    delete crypted;
    crypted=0;
  }
#endif
  strpos= disk_buff+pos;

  share->intervals= (TYPELIB*) (field_ptr+share->fields+1);
  interval_array= (const char **) (share->intervals+interval_count);
  names= (char*) (interval_array+share->fields+interval_parts+keys+3);
  if (!interval_count)
    share->intervals= 0;			// For better debugging
  memcpy((char*) names, strpos+(share->fields*field_pack_length),
	 (uint) (n_length+int_length));
  comment_pos= names+(n_length+int_length);
  memcpy(comment_pos, disk_buff+read_length-com_length, com_length);

  fix_type_pointers(&interval_array, &share->fieldnames, 1, &names);
  if (share->fieldnames.count != share->fields)
    goto err;
  fix_type_pointers(&interval_array, share->intervals, interval_count,
		    &names);

  {
    /* Set ENUM and SET lengths */
    TYPELIB *interval;
    for (interval= share->intervals;
         interval < share->intervals + interval_count;
         interval++)
    {
      uint count= (uint) (interval->count + 1) * sizeof(uint);
      if (!(interval->type_lengths= (uint *) alloc_root(&share->mem_root,
                                                        count)))
        goto err;
      for (count= 0; count < interval->count; count++)
      {
        char *val= (char*) interval->type_names[count];
        interval->type_lengths[count]= strlen(val);
      }
      interval->type_lengths[count]= 0;
    }
  }

  if (keynames)
    fix_type_pointers(&interval_array, &share->keynames, 1, &keynames);

 /* Allocate handler */
  if (!(handler_file= get_new_handler(share, thd->mem_root,
                                      share->db_type())))
    goto err;

  if (handler_file->set_ha_share_ref(&share->ha_share))
    goto err;

  record= share->default_values-1;              /* Fieldstart = 1 */
  if (share->null_field_first)
  {
    null_flags= null_pos= (uchar*) record+1;
    null_bit_pos= (db_create_options & HA_OPTION_PACK_RECORD) ? 0 : 1;
    /*
      null_bytes below is only correct under the condition that
      there are no bit fields.  Correct values is set below after the
      table struct is initialized
    */
    share->null_bytes= (share->null_fields + null_bit_pos + 7) / 8;
  }
#ifndef WE_WANT_TO_SUPPORT_VERY_OLD_FRM_FILES
  else
  {
    share->null_bytes= (share->null_fields+7)/8;
    null_flags= null_pos= (uchar*) (record + 1 +share->reclength -
                                    share->null_bytes);
    null_bit_pos= 0;
  }
#endif

  use_hash= share->fields >= MAX_FIELDS_BEFORE_HASH;
  if (use_hash)
    use_hash= !my_hash_init(&share->name_hash,
                            system_charset_info,
                            share->fields,0,0,
                            (my_hash_get_key) get_field_name,0,0);

  for (i=0 ; i < share->fields; i++, strpos+=field_pack_length, field_ptr++)
  {
    uint pack_flag, interval_nr, unireg_type, recpos, field_length;
    enum_field_types field_type;
    const CHARSET_INFO *charset=NULL;
    Field::geometry_type geom_type= Field::GEOM_GEOMETRY;
    LEX_STRING comment;

    if (new_frm_ver >= 3)
    {
      /* new frm file in 4.1 */
      field_length= uint2korr(strpos+3);
      recpos=	    uint3korr(strpos+5);
      pack_flag=    uint2korr(strpos+8);
      unireg_type=  (uint) strpos[10];
      interval_nr=  (uint) strpos[12];
      uint comment_length=uint2korr(strpos+15);
      field_type=(enum_field_types) (uint) strpos[13];

      /* charset and geometry_type share the same byte in frm */
      if (field_type == MYSQL_TYPE_GEOMETRY)
      {
#ifdef HAVE_SPATIAL
	geom_type= (Field::geometry_type) strpos[14];
	charset= &my_charset_bin;
#else
	error= 4;  // unsupported field type
	goto err;
#endif
      }
      else
      {
        uint csid= strpos[14] + (((uint) strpos[11]) << 8);
        if (!csid)
          charset= &my_charset_bin;
        else if (!(charset= get_charset(csid, MYF(0))))
        {
          error= 5; // Unknown or unavailable charset
          errarg= (int) csid;
          goto err;
        }
      }
      if (!comment_length)
      {
        comment.str= (char*) "";
        comment.length=0;
      }
      else
      {
        comment.str=    (char*) comment_pos;
        comment.length= comment_length;
        comment_pos+=   comment_length;
      }
    }
    else
    {
      field_length= (uint) strpos[3];
      recpos=	    uint2korr(strpos+4),
      pack_flag=    uint2korr(strpos+6);
      pack_flag&=   ~FIELDFLAG_NO_DEFAULT;     // Safety for old files
      unireg_type=  (uint) strpos[8];
      interval_nr=  (uint) strpos[10];

      /* old frm file */
      field_type= (enum_field_types) f_packtype(pack_flag);
      if (f_is_binary(pack_flag))
      {
        /*
          Try to choose the best 4.1 type:
          - for 4.0 "CHAR(N) BINARY" or "VARCHAR(N) BINARY" 
            try to find a binary collation for character set.
          - for other types (e.g. BLOB) just use my_charset_bin. 
        */
        if (!f_is_blob(pack_flag))
        {
          // 3.23 or 4.0 string
          if (!(charset= get_charset_by_csname(share->table_charset->csname,
                                               MY_CS_BINSORT, MYF(0))))
            charset= &my_charset_bin;
        }
        else
          charset= &my_charset_bin;
      }
      else
        charset= share->table_charset;
      memset(&comment, 0, sizeof(comment));
    }

    if (interval_nr && charset->mbminlen > 1)
    {
      /* Unescape UCS2 intervals from HEX notation */
      TYPELIB *interval= share->intervals + interval_nr - 1;
      unhex_type2(interval);
    }
    
#ifndef TO_BE_DELETED_ON_PRODUCTION
    if (field_type == MYSQL_TYPE_NEWDECIMAL && !share->mysql_version)
    {
      /*
        Fix pack length of old decimal values from 5.0.3 -> 5.0.4
        The difference is that in the old version we stored precision
        in the .frm table while we now store the display_length
      */
      uint decimals= f_decimals(pack_flag);
      field_length= my_decimal_precision_to_length(field_length,
                                                   decimals,
                                                   f_is_dec(pack_flag) == 0);
      sql_print_error("Found incompatible DECIMAL field '%s' in %s; "
                      "Please do \"ALTER TABLE `%s` FORCE\" to fix it!",
                      share->fieldnames.type_names[i], share->table_name.str,
                      share->table_name.str);
      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                          ER_CRASHED_ON_USAGE,
                          "Found incompatible DECIMAL field '%s' in %s; "
                          "Please do \"ALTER TABLE `%s` FORCE\" to fix it!",
                          share->fieldnames.type_names[i],
                          share->table_name.str,
                          share->table_name.str);
      share->crashed= 1;                        // Marker for CHECK TABLE
    }
#endif

    *field_ptr= reg_field=
      make_field(share, record+recpos,
		 (uint32) field_length,
		 null_pos, null_bit_pos,
		 pack_flag,
		 field_type,
		 charset,
		 geom_type,
		 (Field::utype) MTYP_TYPENR(unireg_type),
		 (interval_nr ?
		  share->intervals+interval_nr-1 :
		  (TYPELIB*) 0),
		 share->fieldnames.type_names[i]);
    if (!reg_field)				// Not supported field type
    {
      error= 4;
      goto err;			/* purecov: inspected */
    }

    reg_field->field_index = i;
    reg_field->comment=comment;
    if (field_type == MYSQL_TYPE_BIT && !f_bit_as_char(pack_flag))
    {
      if ((null_bit_pos+= field_length & 7) > 7)
      {
        null_pos++;
        null_bit_pos-= 8;
      }
    }
    if (!(reg_field->flags & NOT_NULL_FLAG))
    {
      if (!(null_bit_pos= (null_bit_pos + 1) & 7))
        null_pos++;
    }
    if (f_no_default(pack_flag))
      reg_field->flags|= NO_DEFAULT_VALUE_FLAG;

    if (reg_field->unireg_check == Field::NEXT_NUMBER)
      share->found_next_number_field= field_ptr;

    if (use_hash)
      if (my_hash_insert(&share->name_hash, (uchar*) field_ptr) )
      {
        /*
          Set return code 8 here to indicate that an error has
          occurred but that the error message already has been
          sent (OOM).
        */
        error= 8; 
        goto err;
      }

    if (format_section_fields)
    {
      const uchar field_flags= format_section_fields[i];
      const uchar field_storage= (field_flags & STORAGE_TYPE_MASK);
      const uchar field_column_format=
        ((field_flags >> COLUMN_FORMAT_SHIFT)& COLUMN_FORMAT_MASK);
      DBUG_PRINT("debug", ("field flags: %u, storage: %u, column_format: %u",
                           field_flags, field_storage, field_column_format));
      reg_field->set_storage_type((ha_storage_media)field_storage);
      reg_field->set_column_format((column_format_type)field_column_format);
    }
  }
  *field_ptr=0;					// End marker

  /* Fix key->name and key_part->field */
  if (key_parts)
  {
    const int pk_off= find_type(primary_key_name, &share->keynames,
                                  FIND_TYPE_NO_PREFIX);
    uint primary_key= (pk_off > 0 ? pk_off-1 : MAX_KEY);
    /*
      The following if-else is here for MyRocks:
      set share->primary_key as early as possible, because the return value
      of ha_rocksdb::index_flags(key, ...) (HA_KEYREAD_ONLY bit in particular)
      depends on whether the key is the primary key.
    */
    if (primary_key < MAX_KEY && share->keys_in_use.is_set(primary_key))
      share->primary_key= primary_key;
    else
      share->primary_key= MAX_KEY;

    longlong ha_option= handler_file->ha_table_flags();
    keyinfo= share->key_info;
    key_part= keyinfo->key_part;

    for (uint key=0 ; key < share->keys ; key++,keyinfo++)
    {
      uint usable_parts= 0;
      keyinfo->name=(char*) share->keynames.type_names[key];
      /* Fix fulltext keys for old .frm files */
      if (share->key_info[key].flags & HA_FULLTEXT)
	share->key_info[key].algorithm= HA_KEY_ALG_FULLTEXT;

      if (primary_key >= MAX_KEY && (keyinfo->flags & HA_NOSAME))
      {
	/*
	  If the UNIQUE key doesn't have NULL columns and is not a part key
	  declare this as a primary key.
	*/
	primary_key=key;
	for (i=0 ; i < keyinfo->user_defined_key_parts ;i++)
	{
          DBUG_ASSERT(key_part[i].fieldnr > 0);
          // Table field corresponding to the i'th key part.
          Field *table_field= share->field[key_part[i].fieldnr - 1];

          /*
            If the key column is of NOT NULL BLOB type, then it
            will definitly have key prefix. And if key part prefix size
            is equal to the BLOB column max size, then we can promote
            it to primary key.
          */
          if (!table_field->real_maybe_null() &&
              table_field->type() == MYSQL_TYPE_BLOB &&
              table_field->field_length == key_part[i].length)
            continue;
          /*
            If the key column is of NOT NULL GEOMETRY type, specifically POINT
            type whose length is known internally (which is 25). And key part
            prefix size is equal to the POINT column max size, then we can
            promote it to primary key.
          */
          if (!table_field->real_maybe_null() &&
              table_field->type() == MYSQL_TYPE_GEOMETRY &&
              table_field->get_geometry_type() == Field::GEOM_POINT &&
              key_part[i].length == MAX_LEN_GEOM_POINT_FIELD)
            continue;

	  if (table_field->real_maybe_null() ||
	      table_field->key_length() != key_part[i].length)
 	  {
	    primary_key= MAX_KEY;		// Can't be used
	    break;
	  }
	}

        /*
          The following is here for MyRocks. See the comment above
          about "set share->primary_key as early as possible"
        */
        if (primary_key < MAX_KEY && share->keys_in_use.is_set(primary_key))
          share->primary_key= primary_key;
      }

      for (i=0 ; i < keyinfo->user_defined_key_parts ; key_part++,i++)
      {
        Field *field;
	if (new_field_pack_flag <= 1)
	  key_part->fieldnr= (uint16) find_field(share->field,
                                                 share->default_values,
                                                 (uint) key_part->offset,
                                                 (uint) key_part->length);
	if (!key_part->fieldnr)
        {
          error= 4;                             // Wrong file
          goto err;
        }
        field= key_part->field= share->field[key_part->fieldnr-1];
        key_part->type= field->key_type();

        bool is_string_type_document_key = false;
        if (field->type() == MYSQL_TYPE_DOCUMENT)
        {
          /* since a field with document type cannot be a key part
             so this must a document path key part */
          DBUG_ASSERT(key_part->document_path_key_part &&
            key_part->document_path_key_part->number_of_names >= 2 &&
            strcmp(key_part->document_path_key_part->names[0],
                   field->field_name) == 0);
          /* the document path flag has been removed */
          DBUG_ASSERT(f_is_document(key_part->key_type));

          /* a document field is always nullable */
          DBUG_ASSERT(field->real_maybe_null());

          is_string_type_document_key =
            (key_part->document_path_key_part->type == MYSQL_TYPE_STRING);
        }

        if (field->real_maybe_null())
        {
          key_part->null_offset=field->null_offset(share->default_values);
          key_part->null_bit= field->null_bit;
          key_part->store_length+=HA_KEY_NULL_LENGTH;
          keyinfo->flags|=HA_NULL_PART_KEY;
          keyinfo->key_length+= HA_KEY_NULL_LENGTH;
        }
        if (field->type() == MYSQL_TYPE_BLOB ||
            field->real_type() == MYSQL_TYPE_VARCHAR ||
            field->type() == MYSQL_TYPE_GEOMETRY ||
            is_string_type_document_key)
        {
          key_part->store_length+=HA_KEY_BLOB_LENGTH;
          if (i + 1 <= keyinfo->user_defined_key_parts)
            keyinfo->key_length+= HA_KEY_BLOB_LENGTH;
        }
        key_part->init_flags();

        setup_key_part_field(share, handler_file, primary_key,
                             keyinfo, key, i, &usable_parts);

        field->flags|= PART_KEY_FLAG;
        if (key == primary_key)
        {
          field->flags|= PRI_KEY_FLAG;

          /*
            "if (ha_option & HA_PRIMARY_KEY_IN_READ_INDEX)" ... was moved below
            for MyRocks
          */
        }

        /* key_length() of document fields don't provide the length of keys */
        if (field->key_length() != key_part->length &&
            field->type() != MYSQL_TYPE_DOCUMENT)
        {
#ifndef TO_BE_DELETED_ON_PRODUCTION
          if (field->type() == MYSQL_TYPE_NEWDECIMAL)
          {
            /*
              Fix a fatal error in decimal key handling that causes crashes
              on Innodb. We fix it by reducing the key length so that
              InnoDB never gets a too big key when searching.
              This allows the end user to do an ALTER TABLE to fix the
              error.
            */
            keyinfo->key_length-= (key_part->length - field->key_length());
            key_part->store_length-= (uint16)(key_part->length -
                                              field->key_length());
            key_part->length= (uint16)field->key_length();
            sql_print_error("Found wrong key definition in %s; "
                            "Please do \"ALTER TABLE `%s` FORCE \" to fix it!",
                            share->table_name.str,
                            share->table_name.str);
            push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
                                ER_CRASHED_ON_USAGE,
                                "Found wrong key definition in %s; "
                                "Please do \"ALTER TABLE `%s` FORCE\" to fix "
                                "it!",
                                share->table_name.str,
                                share->table_name.str);
            share->crashed= 1;                // Marker for CHECK TABLE
            continue;
          }
#endif
          key_part->key_part_flag|= HA_PART_KEY_SEG;
        }
      }


      if (use_extended_sk && primary_key < MAX_KEY &&
          key && !(keyinfo->flags & HA_NOSAME))
        key_part+= add_pk_parts_to_sk(keyinfo, key, share->key_info, primary_key,
                                      share,  handler_file, &usable_parts);

      /* Skip unused key parts if they exist */
      key_part+= keyinfo->unused_key_parts;

      keyinfo->usable_key_parts= usable_parts; // Filesort

      set_if_bigger(share->max_key_length,keyinfo->key_length+
                    keyinfo->user_defined_key_parts);
      share->total_key_length+= keyinfo->key_length;
      /*
        MERGE tables do not have unique indexes. But every key could be
        an unique index on the underlying MyISAM table. (Bug #10400)
      */
      if ((keyinfo->flags & HA_NOSAME) ||
          (ha_option & HA_ANY_INDEX_MAY_BE_UNIQUE))
        set_if_bigger(share->max_unique_length,keyinfo->key_length);
    }

    /*
      The next call is here for MyRocks:  Now, we have filled in field and key
      definitions, give the storage engine a chance to adjust its properties.

      MyRocks may (and typically does) adjust HA_PRIMARY_KEY_IN_READ_INDEX
      flag in this call.
    */
    if (handler_file->init_with_fields())
      goto err;

    if (primary_key < MAX_KEY && (handler_file->ha_table_flags() &
                                  HA_PRIMARY_KEY_IN_READ_INDEX))
    {
      keyinfo= &share->key_info[primary_key];
      key_part= keyinfo->key_part;
      for (i=0 ; i < keyinfo->user_defined_key_parts ; key_part++,i++)
      {
        Field *field= key_part->field;
        /*
          If this field is part of the primary key and all keys contains
          the primary key, then we can use any key to find this column
        */
        if (field->key_length() == key_part->length &&
            !(field->flags & BLOB_FLAG))
          field->part_of_key= share->keys_in_use;
        if (field->part_of_sortkey.is_set(primary_key))
          field->part_of_sortkey= share->keys_in_use;
      }
    }

    if (share->primary_key != MAX_KEY)
    {
      /*
	If we are using an integer as the primary key then allow the user to
	refer to it as '_rowid'
      */
      if (share->key_info[primary_key].user_defined_key_parts == 1)
      {
	Field *field= share->key_info[primary_key].key_part[0].field;
	if (field && field->result_type() == INT_RESULT)
        {
          /* note that fieldnr here (and rowid_field_offset) starts from 1 */
	  share->rowid_field_offset= (share->key_info[primary_key].key_part[0].
                                      fieldnr);
        }
      }
    }
  }
  else
    share->primary_key= MAX_KEY;

  /* TODO: for now, by default indexes containing document path are not in use
   */
  share->keys_in_use.subtract(share->document_keys);

  my_free(disk_buff);
  disk_buff=0;
  if (new_field_pack_flag <= 1)
  {
    /* Old file format with default as not null */
    uint null_length= (share->null_fields+7)/8;
    memset(share->default_values + (null_flags - (uchar*) record), 255,
           null_length);
  }

  if (share->found_next_number_field)
  {
    reg_field= *share->found_next_number_field;
    if ((int) (share->next_number_index= (uint)
	       find_ref_key(share->key_info, share->keys,
                            share->default_values, reg_field,
			    &share->next_number_key_offset,
                            &share->next_number_keypart)) < 0)
    {
      /* Wrong field definition */
      error= 4;
      goto err;
    }
    else
      reg_field->flags |= AUTO_INCREMENT_FLAG;
  }

  if (share->blob_fields)
  {
    Field **ptr;
    uint k, *save;

    /* Store offsets to blob fields to find them fast */
    if (!(share->blob_field= save=
	  (uint*) alloc_root(&share->mem_root,
                             (uint) (share->blob_fields* sizeof(uint)))))
      goto err;
    for (k=0, ptr= share->field ; *ptr ; ptr++, k++)
    {
      if ((*ptr)->flags & BLOB_FLAG)
	(*save++)= k;
    }
  }

  /*
    the correct null_bytes can now be set, since bitfields have been taken
    into account
  */
  share->null_bytes= (null_pos - (uchar*) null_flags +
                      (null_bit_pos + 7) / 8);
  share->last_null_bit_pos= null_bit_pos;

  share->db_low_byte_first= handler_file->low_byte_first();
  share->column_bitmap_size= bitmap_buffer_size(share->fields);

  if (!(bitmaps= (my_bitmap_map*) alloc_root(&share->mem_root,
                                             share->column_bitmap_size)))
    goto err;
  bitmap_init(&share->all_set, bitmaps, share->fields, FALSE);
  bitmap_set_all(&share->all_set);

  delete handler_file;
#ifndef DBUG_OFF
  if (use_hash)
    (void) my_hash_check(&share->name_hash);
#endif
  my_free(extra_segment_buff);
  DBUG_RETURN (0);

 err:
  share->error= error;
  share->open_errno= my_errno;
  share->errarg= errarg;
  my_free(disk_buff);
  my_free(extra_segment_buff);
  delete crypted;
  delete handler_file;
  my_hash_free(&share->name_hash);

  open_table_error(share, error, share->open_errno, errarg);
  DBUG_RETURN(error);
} /* open_binary_frm */