bool add_pk_to_create_table_if_missing()

in modules/util/dump/compatibility.cc [1283:1523]


bool add_pk_to_create_table_if_missing(const std::string &statement,
                                       std::string *rewritten,
                                       bool ignore_pke) {
  assert(rewritten);

  SQL_iterator it(statement, 0, false);

  if (!shcore::str_caseeq(it.next_token(), "CREATE") ||
      !shcore::str_caseeq(it.next_token(), "TABLE")) {
    throw std::runtime_error(
        "Invisible primary key can be only added to CREATE TABLE statements");
  }

  // find opening parenthesis of create_definition
  while (it.valid() && it.next_token() != "(") {
  }

  if (!it.valid()) {
    // no create_definition -> CREATE TABLE ... LIKE or CREATE TABLE ... SELECT
    throw std::runtime_error("Unsupported CREATE TABLE statement");
  }

  // we're going to insert the PK as the first column
  const auto position = it.position();
  int brace_count = 1;

  const auto skip_create_definition = [&brace_count, &it]() {
    while (it.valid() && brace_count > 0) {
      const auto token = it.next_token();

      if (shcore::str_caseeq(token, "(")) {
        ++brace_count;
      } else if (shcore::str_caseeq(token, ")")) {
        --brace_count;
      } else if (shcore::str_caseeq(token, ",") && 1 == brace_count) {
        // comma directly inside of create_definition
        return;
      }
    }
  };

  const auto next_token = [&it]() {
    if (!it.valid()) {
      throw std::runtime_error("Malformed CREATE TABLE statement");
    }

    return it.next_token();
  };

  std::string token;
  bool first_token = true;
  std::vector<std::vector<std::string>> unique_indexes;
  // column name -> NOT NULL
  std::unordered_map<std::string, bool> columns;

  while (it.valid() && brace_count > 0) {
    token = it.next_token();

    if (shcore::str_caseeq(token, "LIKE") && first_token) {
      // CREATE TABLE ... LIKE
      throw std::runtime_error("Unsupported CREATE TABLE statement");
    }

    first_token = false;

    if (shcore::str_caseeq(token, "INDEX", "KEY", "FULLTEXT", "SPATIAL")) {
      // {INDEX | KEY} [index_name] [index_type] (key_part,...)
      //   [index_option] ...
      // or
      // {FULLTEXT | SPATIAL} [INDEX | KEY] [index_name] (key_part,...)
      //   [index_option] ...
      skip_create_definition();
      continue;
    }

    if (shcore::str_caseeq(token, "CONSTRAINT")) {
      token = next_token();

      if (!shcore::str_caseeq(token, "PRIMARY", "UNIQUE", "FOREIGN", "CHECK")) {
        // symbol name, skip it
        token = next_token();
      }
    }

    if (shcore::str_caseeq(token, "PRIMARY")) {
      token = next_token();

      if (!shcore::str_caseeq(token, "KEY")) {
        throw std::runtime_error("Malformed CREATE TABLE statement");
      }

      // we don't care about the rest
      return false;
    } else if (shcore::str_caseeq(token, "UNIQUE")) {
      // [CONSTRAINT [symbol]] UNIQUE [INDEX | KEY]
      //  [index_name] [index_type] (key_part,...)
      //  [index_option] ...

      // move till first key_part
      do {
        token = next_token();
      } while (!shcore::str_caseeq(token, "("));

      ++brace_count;

      std::vector<std::string> unique_index;
      bool ignore_index = false;

      do {
        // key_part: {col_name [(length)] | (expr)} [ASC | DESC]
        token = next_token();

        if (shcore::str_caseeq(token, "(")) {
          // (expr), we don't allow for such indexes
          ignore_index = true;

          const auto current_brace_count = brace_count;
          ++brace_count;

          while (current_brace_count != brace_count) {
            token = next_token();

            if (shcore::str_caseeq(token, "(")) {
              ++brace_count;
            } else if (shcore::str_caseeq(token, ")")) {
              --brace_count;
            }
          }

          token = next_token();
        } else {
          unique_index.emplace_back(shcore::unquote_identifier(token));

          token = next_token();

          if (shcore::str_caseeq(token, "(")) {
            token = next_token();  // length
            token = next_token();  // )

            if (!shcore::str_caseeq(token, ")")) {
              throw std::runtime_error("Malformed CREATE TABLE statement");
            }

            // move past this part
            token = next_token();
          }
        }

        if (shcore::str_caseeq(token, "ASC", "DESC")) {
          // skip this
          token = next_token();
        }

        // we're now either at comma, or at finishing parenthesis
      } while (!shcore::str_caseeq(token, ")"));

      --brace_count;

      if (!ignore_index) {
        unique_indexes.emplace_back(std::move(unique_index));
      }

      // ignore the rest of create_definition
      skip_create_definition();
      continue;
    } else if (shcore::str_caseeq(token, "FOREIGN", "CHECK")) {
      // [CONSTRAINT [symbol]] FOREIGN KEY
      //   [index_name] (col_name,...)
      //   reference_definition
      // or
      // [CONSTRAINT [symbol]] CHECK (expr) [[NOT] ENFORCED]
      skip_create_definition();
      continue;
    } else if (shcore::str_caseeq(token, ")")) {
      --brace_count;
    } else {
      // col_name column_definition
      auto column_name = shcore::unquote_identifier(token);
      // If neither NULL nor NOT NULL is specified, the column is treated as
      // though NULL had been specified.
      bool not_null = false;

      while (brace_count > 0) {
        token = next_token();

        if (shcore::str_caseeq(token, "NOT")) {
          token = next_token();

          if (!shcore::str_caseeq(token, "NULL")) {
            throw std::runtime_error("Malformed CREATE TABLE statement");
          }

          not_null = true;
        } else if (shcore::str_caseeq(token, "UNIQUE")) {
          unique_indexes.emplace_back(std::vector<std::string>{column_name});
        } else if (shcore::str_caseeq(token, "PRIMARY")) {
          // we don't care about the rest
          return false;
        } else if (shcore::str_caseeq(token, "(")) {
          ++brace_count;
        } else if (shcore::str_caseeq(token, ")")) {
          --brace_count;
        } else if (shcore::str_caseeq(token, ",") && 1 == brace_count) {
          // comma directly inside of create_definition
          break;
        }
      }

      columns.emplace(std::move(column_name), not_null);
    }
  }

  if (!ignore_pke) {
    for (const auto &unique_index : unique_indexes) {
      bool pke = true;

      for (const auto &column : unique_index) {
        pke &= columns[column];
      }

      if (pke) {
        // all columns of an unique index are not null, we have a PK equivalent
        return false;
      }
    }
  }

  while (it.valid()) {
    if (shcore::str_caseeq(it.next_token(), "SELECT")) {
      // CREATE TABLE ... SELECT
      throw std::runtime_error("Unsupported CREATE TABLE statement");
    }
  }

  *rewritten =
      statement.substr(0, position) +
      "`my_row_id` BIGINT UNSIGNED AUTO_INCREMENT INVISIBLE PRIMARY KEY," +
      statement.substr(position);

  return true;
}