Mysql_sql_parser::Parse_result Mysql_sql_parser::process_create_table_statement()

in modules/db.mysql.sqlparser/src/mysql_sql_parser.cpp [1172:1564]


Mysql_sql_parser::Parse_result Mysql_sql_parser::process_create_table_statement(const SqlAstNode *tree) {
  const SqlAstNode *create2_item = tree->subitem(sql::_create2);

  // check if statement is relevant
  if (!create2_item)
    return pr_irrelevant;

  ACTIVE_SCHEMA_KEEPER

  db_mysql_SchemaRef schema;
  db_mysql_TableRef obj;

  // try to find previously created stub object with the same name
  {
    const SqlAstNode *table_ident_item = tree->find_subseq(sql::_TABLE_SYM, sql::_table_ident);
    if (!table_ident_item)
      table_ident_item = tree->find_subseq(sql::_TABLE_SYM, sql::_opt_if_not_exists, sql::_table_ident);
    std::string obj_name = process_obj_full_name_item(table_ident_item, &schema);

    step_progress(obj_name);

    _active_schema = schema;

    // check for same-named view
    {
      db_mysql_ViewRef obj = find_named_object_in_list(schema->views(), obj_name, _case_sensitive_identifiers);
      if (obj.is_valid()) {
        Val_keeper<bool> reuse_existing_objects_keeper(&_reuse_existing_objects);
        _reuse_existing_objects = false;
        blame_existing_obj(true, obj, schema);
      }
    }

    {
      Val_keeper<bool> reuse_existing_objects_keeper(&_reuse_existing_objects);
      _reuse_existing_objects = true;
      obj = create_or_find_named_obj(schema->tables(), obj_name, _case_sensitive_identifiers, schema);
    }

    if (_reusing_existing_obj) {
      if (obj->isStub()) {
        obj->isStub(0); // reuse equally named stub

        std::string msg_text;
        msg_text.append("Previously created stub for table `")
          .append(*schema->name())
          .append("`.`")
          .append(obj_name)
          .append("` was found. Reusing.");
        add_log_message(msg_text, 1);
      } else
        blame_existing_obj(true, obj, schema);
    } else {
      // name
      std::string name = process_obj_full_name_item(table_ident_item, NULL);
      if (obj.is_valid())
        set_obj_name(obj, name);
    }
  }

  // options
  {
    const SqlAstNode *items =
      create2_item->subitem(sql::_create2a, sql::_opt_create_table_options, sql::_create_table_options);
    if (items) {
      for (SqlAstNode::SubItemList::const_iterator it = items->subitems()->begin(); it != items->subitems()->end();
           ++it) {
        const SqlAstNode *item = *it;
        if (item->name_equals(sql::_create_table_option)) {
          const SqlAstNode *aux_item = NULL;

          if (item->subseq(sql::_ENGINE_SYM) || item->subseq(sql::_TYPE_SYM)) {
            SET_STR_SI(obj->tableEngine, item, sql::_storage_engines)
            obj->tableEngine(__table_storage_engines.normalize_name(obj->tableEngine()));
          } else if (item->subseq(sql::_ROW_FORMAT_SYM))
            SET_STR_SI(obj->rowFormat, item, sql::_row_types)
          else if (item->subseq(sql::_KEY_BLOCK_SIZE))
            SET_STR_SI(obj->keyBlockSize, item, sql::_ulong_num)
          else if (item->subseq(sql::_AUTO_INC))
            SET_STR_SI(obj->nextAutoInc, item, sql::_ulonglong_num)
          else if ((aux_item = item->subseq(sql::_default_charset)))
            SET_STR_SI(cs_collation_setter(obj, schema, false).charset_name, aux_item, sql::_charset_name_or_default)
          else if ((aux_item = item->subseq(sql::_default_collation)))
            SET_STR_SI(cs_collation_setter(obj, schema, false).collation_name, aux_item,
                       sql::_collation_name_or_default)
          else if (item->subseq(sql::_DELAY_KEY_WRITE_SYM))
            SET_INT_SI(obj->delayKeyWrite, item, sql::_ulong_num)
          else if (item->subseq(sql::_COMMENT_SYM))
            SET_STR_SI(obj->comment, item, sql::_TEXT_STRING_sys)
          else if (item->subseq(sql::_DATA_SYM, sql::_DIRECTORY_SYM))
            SET_STR_SI(obj->tableDataDir, item, sql::_TEXT_STRING_sys)
          else if (item->subseq(sql::_INDEX_SYM, sql::_DIRECTORY_SYM))
            SET_STR_SI(obj->tableIndexDir, item, sql::_TEXT_STRING_sys)
          else if (item->subseq(sql::_PACK_KEYS_SYM)) {
            SET_STR_SI(obj->packKeys, item, sql::_DEFAULT)
            SET_STR_SI(obj->packKeys, item, sql::_ulong_num)
          } else if (item->subseq(sql::_AVG_ROW_LENGTH))
            SET_STR_SI(obj->avgRowLength, item, sql::_ulong_num)
          else if (item->subseq(sql::_MIN_ROWS))
            SET_STR_SI(obj->minRows, item, sql::_ulonglong_num)
          else if (item->subseq(sql::_MAX_ROWS))
            SET_STR_SI(obj->maxRows, item, sql::_ulonglong_num)
          else if (item->subseq(sql::_CHECKSUM_SYM))
            SET_INT_SI(obj->checksum, item, sql::_ulong_num)
          else if (item->subseq(sql::_INSERT_METHOD))
            SET_STR_SI(obj->mergeInsert, item, sql::_merge_insert_types)
          else if (item->subseq(sql::_UNION_SYM)) {
            /*
              server cuts schema qualification from table names (only for those located within the same schema with
              table being defined)
              in union list when using 'show create' statement.
              that's why to avoid false differences during merge procedure, all table names are explicitly normalized
              (fully qualified names only).
            */
            const SqlAstNode *items = item->subitem(sql::_opt_table_list, sql::_table_list);
            if (items && items->subitems()) {
              std::string table_list;
              for (SqlAstNode::SubItemList::const_iterator it = items->subitems()->begin();
                   it != items->subitems()->end(); ++it) {
                const SqlAstNode *item = *it;
                if (item->name_equals(sql::_table_name)) {
                  db_mysql_SchemaRef schema;
                  std::string obj_name = process_obj_full_name_item(item->subitem(sql::_table_ident), &schema);
                  if (!table_list.empty())
                    table_list.append(", ");
                  table_list.append(qualify_obj_name(obj_name, schema->name()));
                }
              }
              if (!table_list.empty())
                obj->mergeUnion(table_list);
            }
          }
        }
      }
    }
  }

  // partitioning
  {
    class Partition_definition {
      static void parse_options(db_mysql_PartitionDefinitionRef part_obj, const SqlAstNode *part_options) {
        for (SqlAstNode::SubItemList::const_iterator it = part_options->subitems()->begin(),
                                                     it_end = part_options->subitems()->end();
             it != it_end; ++it) {
          const SqlAstNode *part_option = *it;
          if (!part_option->name_equals(sql::_opt_part_option))
            continue;

          const SqlAstNode *part_option_name;

          if ((part_option_name = part_option->subitem(sql::_MAX_ROWS)))
            SET_STR_SI(part_obj->maxRows, part_option, sql::_real_ulonglong_num)
          else if ((part_option_name = part_option->subitem(sql::_MIN_ROWS)))
            SET_STR_SI(part_obj->minRows, part_option, sql::_real_ulonglong_num)
          else if ((part_option_name = part_option->subitem(sql::_DATA_SYM)))
            SET_STR_SI(part_obj->dataDirectory, part_option, sql::_TEXT_STRING_sys)
          else if ((part_option_name = part_option->subitem(sql::_INDEX_SYM)))
            SET_STR_SI(part_obj->indexDirectory, part_option, sql::_TEXT_STRING_sys)
          else if ((part_option_name = part_option->subitem(sql::_COMMENT_SYM)))
            SET_STR_SI(part_obj->comment, part_option, sql::_TEXT_STRING_sys)
          // TODO: process ENGINE (not supported by server as of 5.1.22)
        }
      }

      // returns count of subpartitions
      static void parse_subpartitions(db_mysql_PartitionDefinitionRef part_obj, const SqlAstNode *subpart_list) {
        for (SqlAstNode::SubItemList::const_iterator it = subpart_list->subitems()->begin(),
                                                     it_end = subpart_list->subitems()->end();
             it != it_end; ++it) {
          const SqlAstNode *subpart_item = *it;
          if (!subpart_item->name_equals(sql::_sub_part_definition))
            continue;

          db_mysql_PartitionDefinitionRef subpart_obj(grt::Initialized);

          SET_STR_SI(subpart_obj->name, subpart_item, sql::_sub_name);
          parse_options(subpart_obj, subpart_item->subitem(sql::_opt_part_options, sql::_opt_part_option_list));

          part_obj->subpartitionDefinitions().insert(subpart_obj);
        }
      }

    public:
      static db_mysql_PartitionDefinitionRef parse(const SqlAstNode *part_item, const std::string &_sql_statement) {
        db_mysql_PartitionDefinitionRef part_obj(grt::Initialized);

        const SqlAstNode *part_attr;

        if ((part_attr = part_item->subitem(sql::_part_name)))
          part_obj->name(part_attr->value());

        if (const SqlAstNode *opt_part_values_item = part_item->subitem(sql::_opt_part_values)) {
          static sql::symbol path1[] = {sql::_part_func_max, sql::_MAX_VALUE_SYM, sql::_};
          static sql::symbol path2[] = {sql::_part_func_max, sql::_part_value_item, sql::_part_value_item_list, sql::_};
          static sql::symbol path3[] = {sql::_part_values_in, sql::_part_value_item, sql::_part_value_item_list,
                                        sql::_};
          static sql::symbol path4[] = {sql::_part_values_in, sql::_part_value_list, sql::_};
          static sql::symbol *paths[] = {path1, path2, path3, path4};

          part_attr = opt_part_values_item->search_by_paths(paths, ARR_CAPACITY(paths));

          SET_SQL_I(part_obj->value, part_attr)
        }

        if ((part_attr = part_item->subitem(sql::_opt_part_options, sql::_opt_part_option_list)))
          parse_options(part_obj, part_attr);

        if ((part_attr = part_item->subitem(sql::_opt_sub_partition, sql::_sub_part_list)))
          parse_subpartitions(part_obj, part_attr);

        return part_obj;
      }
    }; // class Partition_definition

    // clear partition list in case this is a reused table
    obj->partitionDefinitions().remove_all();

    const SqlAstNode *partition =
      create2_item->subitem(sql::_create2a, sql::_opt_create_partitioning, sql::_partitioning, sql::_partition);
    if (partition) {
      bool is_range_or_list_partition = (partition->subitem(sql::_part_type_def, sql::_RANGE_SYM) != NULL) ||
                                        (partition->subitem(sql::_part_type_def, sql::_LIST_SYM) != NULL);

      const SqlAstNode *item;

      if ((item = partition->subitem(sql::_part_type_def))) {
        std::string part_type;
        const SqlAstNode *part_exprt_item = NULL;

        if (item->subitem(sql::_LINEAR_SYM))
          part_type.append("LINEAR ");

        if (item->subitem(sql::_HASH_SYM))
          part_type.append("HASH");
        else if (item->subitem(sql::_KEY_SYM)) {
          part_type.append("KEY");
          part_exprt_item = item->subitem(sql::_part_field_list, sql::_part_field_item_list);
        } else if (item->subitem(sql::_RANGE_SYM))
          part_type.append("RANGE");
        else if (item->subitem(sql::_LIST_SYM))
          part_type.append("LIST");

        obj->partitionType(part_type);

        if (!part_exprt_item)
          part_exprt_item = item->subitem(sql::_part_func, sql::_part_func_expr);

        SET_SQL_I(obj->partitionExpression, part_exprt_item);
      }

      if ((item = partition->subitem(sql::_opt_num_parts))) {
        if (!is_range_or_list_partition) // is not valid for range partitions
          SET_INT_SI(obj->partitionCount, item, sql::_real_ulong_num);
      }

      if ((item = partition->subitem(sql::_opt_sub_part))) {
        if (is_range_or_list_partition) // valid only for range partitions
        {
          std::string subpart_type;

          if (item->subitem(sql::_LINEAR_SYM))
            subpart_type.append("LINEAR ");

          if (item->subitem(sql::_HASH_SYM))
            subpart_type.append("HASH");
          else if (item->subitem(sql::_KEY_SYM))
            subpart_type.append("KEY");

          obj->subpartitionType(subpart_type);

          SET_SQL_I(obj->subpartitionExpression, item->subitem(sql::_sub_part_func, sql::_part_func_expr));
        }
      }

      if ((item = partition->subitem(sql::_part_defs))) {
        if (is_range_or_list_partition) // valid only for range partitions
        {
          const SqlAstNode *part_def_list = item->subitem(sql::_part_def_list);

          for (SqlAstNode::SubItemList::const_iterator jt = part_def_list->subitems()->begin(),
                                                       jt_end = part_def_list->subitems()->end();
               jt != jt_end; ++jt) {
            const SqlAstNode *part = *jt;
            if (part->name_equals(sql::_part_definition)) {
              db_mysql_PartitionDefinitionRef part_obj = Partition_definition::parse(part, _sql_statement);
              obj->partitionDefinitions().insert(part_obj);
            }
          }
        }
      }

      if (obj->partitionCount() == 0)
        obj->partitionCount((long)obj->partitionDefinitions().count());
      if (obj->partitionDefinitions().count() > 0)
        obj->subpartitionCount((long)obj->partitionDefinitions().get(0)->subpartitionDefinitions().count());
    }
  }

  // columns, indices, foreign keys
  {
    const SqlAstNode *items = create2_item->subitem(sql::_create2a, sql::_create_field_list, sql::_field_list);
    if (items) {
      for (SqlAstNode::SubItemList::const_iterator it = items->subitems()->begin(); it != items->subitems()->end();
           ++it) {
        const SqlAstNode *item = *it;
        if (item->name_equals(sql::_field_list_item)) {
          const SqlAstNode *aux_item = NULL;

          if ((aux_item = item->subitem(sql::_column_def))) {
            db_mysql_ColumnRef column;

            // field
            if ((aux_item = aux_item->subitem(sql::_field_spec))) {
              bool reusing_column = false;

              // if current table is reusing stub object then try to find existing column with the same name
              if (_reusing_existing_obj) {
                std::string field_name = process_field_name_item(aux_item->subitem(sql::_field_ident));
                column = find_named_object_in_list(obj->columns(), field_name,
                                                   false); // mysql columns are always case-insensitive
              }

              if (column.is_valid())
                reusing_column = true;
              else {
                column = db_mysql_ColumnRef(grt::Initialized);
                column->owner(obj);
              }

              // name
              process_field_name_item(aux_item->subitem(sql::_field_ident), column);

              // type
              process_field_type_item(aux_item->subitem(sql::_type), column);

              // attributes
              process_field_attributes_item(aux_item->subitem(sql::_opt_attribute, sql::_opt_attribute_list), column,
                                            obj);

              if (!reusing_column)
                obj->columns().insert(column);
            }

            if ((aux_item = item->subitem(sql::_column_def, sql::_references))) {
              db_mysql_ForeignKeyRef fk(grt::Initialized);
              fk->owner(obj);
              Fk_ref fk_ref(fk);

              // own columns
              fk->columns().insert(column);

              // name
              if (_gen_fk_names_when_empty) {
                std::string name = bec::TableHelper::generate_foreign_key_name();
                set_obj_name(fk, name);
              }

              // cardinality/mandatory
              fk->referencedMandatory(1);
              grt::ListRef<db_Column> columns = fk->columns();
              for (size_t n = 0, count = columns.count(); n < count; ++n) {
                if (!columns.get(n)->isNotNull()) {
                  fk->referencedMandatory(0);
                  break;
                }
              }
              fk->many(1);

              // references
              process_fk_references_item(aux_item, fk, fk_ref);

              obj->foreignKeys().insert(fk);
              _fk_refs.push_back(fk_ref);
            }
          } else if ((aux_item = item->subitem(sql::_key_def))) {
            if ((aux_item->find_subseq(sql::_FOREIGN, sql::_KEY_SYM)))
              ENSURE(process_fk_item(aux_item, obj), "Table `" + *obj->name() + "` : " + e.what())
            else if ((aux_item->subitem(sql::_key_list)))
              ENSURE(process_index_item(aux_item, obj), "Table `" + *obj->name() + "` : " + e.what())
          }
        }
      }
    }
  }

  if (_shape_table)
    _shape_table(obj);
  do_transactable_list_insert(schema->tables(), obj);
  if (!obj->isStub())
    log_db_obj_created(schema, obj);

  return pr_processed;
}