AdbcStatusCode AppendConstraints()

in c/driver/postgresql/connection.cc [522:681]


  AdbcStatusCode AppendConstraints(std::string schema_name, std::string table_name) {
    struct StringBuilder query;
    std::memset(&query, 0, sizeof(query));
    if (StringBuilderInit(&query, /*initial_size*/ 4096)) {
      return ADBC_STATUS_INTERNAL;
    }

    std::vector<std::string> params = {schema_name, table_name};
    const char* stmt =
        "WITH fk_unnest AS ( "
        "    SELECT "
        "        con.conname, "
        "        'FOREIGN KEY' AS contype, "
        "        conrelid, "
        "        UNNEST(con.conkey) AS conkey, "
        "        confrelid, "
        "        UNNEST(con.confkey) AS confkey "
        "    FROM pg_catalog.pg_constraint AS con "
        "    INNER JOIN pg_catalog.pg_class AS cls ON cls.oid = conrelid "
        "    INNER JOIN pg_catalog.pg_namespace AS nsp ON nsp.oid = cls.relnamespace "
        "    WHERE con.contype = 'f' AND nsp.nspname LIKE $1 "
        "    AND cls.relname LIKE $2 "
        "), "
        "fk_names AS ( "
        "    SELECT "
        "        fk_unnest.conname, "
        "        fk_unnest.contype, "
        "        fk_unnest.conkey, "
        "        fk_unnest.confkey, "
        "        attr.attname, "
        "        fnsp.nspname AS fschema, "
        "        fcls.relname AS ftable, "
        "        fattr.attname AS fattname "
        "    FROM fk_unnest "
        "    INNER JOIN pg_catalog.pg_class AS cls ON cls.oid = fk_unnest.conrelid "
        "    INNER JOIN pg_catalog.pg_class AS fcls ON fcls.oid = fk_unnest.confrelid "
        "    INNER JOIN pg_catalog.pg_namespace AS fnsp ON fnsp.oid = fcls.relnamespace"
        "    INNER JOIN pg_catalog.pg_attribute AS attr ON attr.attnum = "
        "fk_unnest.conkey "
        "        AND attr.attrelid = fk_unnest.conrelid "
        "    LEFT JOIN pg_catalog.pg_attribute AS fattr ON fattr.attnum =  "
        "fk_unnest.confkey "
        "        AND fattr.attrelid = fk_unnest.confrelid "
        "), "
        "fkeys AS ( "
        "    SELECT "
        "        conname, "
        "        contype, "
        "        ARRAY_AGG(attname ORDER BY conkey) AS colnames, "
        "        fschema, "
        "        ftable, "
        "        ARRAY_AGG(fattname ORDER BY confkey) AS fcolnames "
        "    FROM fk_names "
        "    GROUP BY "
        "        conname, "
        "        contype, "
        "        fschema, "
        "        ftable "
        "), "
        "other_constraints AS ( "
        "    SELECT con.conname, CASE con.contype WHEN 'c' THEN 'CHECK' WHEN 'u' THEN  "
        "    'UNIQUE' WHEN 'p' THEN 'PRIMARY KEY' END AS contype, "
        "    ARRAY_AGG(attr.attname) AS colnames "
        "    FROM pg_catalog.pg_constraint AS con  "
        "    CROSS JOIN UNNEST(conkey) AS conkeys  "
        "    INNER JOIN pg_catalog.pg_class AS cls ON cls.oid = con.conrelid  "
        "    INNER JOIN pg_catalog.pg_namespace AS nsp ON nsp.oid = cls.relnamespace  "
        "    INNER JOIN pg_catalog.pg_attribute AS attr ON attr.attnum = conkeys  "
        "    AND cls.oid = attr.attrelid  "
        "    WHERE con.contype IN ('c', 'u', 'p') AND nsp.nspname LIKE $1 "
        "    AND cls.relname LIKE $2 "
        "    GROUP BY conname, contype "
        ") "
        "SELECT "
        "    conname, contype, colnames, fschema, ftable, fcolnames "
        "FROM fkeys "
        "UNION ALL "
        "SELECT "
        "    conname, contype, colnames, NULL, NULL, NULL "
        "FROM other_constraints";

    if (StringBuilderAppend(&query, "%s", stmt)) {
      StringBuilderReset(&query);
      return ADBC_STATUS_INTERNAL;
    }

    if (column_name_ != NULL) {
      if (StringBuilderAppend(&query, "%s", " WHERE conname LIKE $3")) {
        StringBuilderReset(&query);
        return ADBC_STATUS_INTERNAL;
      }

      params.push_back(std::string(column_name_));
    }

    auto result_helper = PqResultHelper{conn_, query.buffer, params, error_};
    StringBuilderReset(&query);

    RAISE_ADBC(result_helper.Prepare());
    RAISE_ADBC(result_helper.Execute());

    for (PqResultRow row : result_helper) {
      const char* constraint_name = row[0].data;
      const char* constraint_type = row[1].data;

      CHECK_NA(
          INTERNAL,
          ArrowArrayAppendString(constraint_name_col_, ArrowCharView(constraint_name)),
          error_);

      CHECK_NA(
          INTERNAL,
          ArrowArrayAppendString(constraint_type_col_, ArrowCharView(constraint_type)),
          error_);

      auto constraint_column_names = PqTextArrayToVector(std::string(row[2].data));
      for (const auto& constraint_column_name : constraint_column_names) {
        CHECK_NA(INTERNAL,
                 ArrowArrayAppendString(constraint_column_name_col_,
                                        ArrowCharView(constraint_column_name.c_str())),
                 error_);
      }
      CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col_), error_);

      if (!strcmp(constraint_type, "FOREIGN KEY")) {
        assert(!row[3].is_null);
        assert(!row[4].is_null);
        assert(!row[5].is_null);

        const char* constraint_ftable_schema = row[3].data;
        const char* constraint_ftable_name = row[4].data;
        auto constraint_fcolumn_names = PqTextArrayToVector(std::string(row[5].data));
        for (const auto& constraint_fcolumn_name : constraint_fcolumn_names) {
          CHECK_NA(INTERNAL,
                   ArrowArrayAppendString(fk_catalog_col_, ArrowCharView(PQdb(conn_))),
                   error_);
          CHECK_NA(INTERNAL,
                   ArrowArrayAppendString(fk_db_schema_col_,
                                          ArrowCharView(constraint_ftable_schema)),
                   error_);
          CHECK_NA(INTERNAL,
                   ArrowArrayAppendString(fk_table_col_,
                                          ArrowCharView(constraint_ftable_name)),
                   error_);
          CHECK_NA(INTERNAL,
                   ArrowArrayAppendString(fk_column_name_col_,
                                          ArrowCharView(constraint_fcolumn_name.c_str())),
                   error_);

          CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_items_),
                   error_);
        }
      }
      CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usages_col_), error_);
      CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items_), error_);
    }

    CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_col_), error_);
    return ADBC_STATUS_OK;
  }