void ConnectionTest::TestMetadataGetObjectsForeignKey()

in c/validation/adbc_validation_connection.cc [1033:1180]


void ConnectionTest::TestMetadataGetObjectsForeignKey() {
  ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
  ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));

  if (!quirks()->supports_get_objects()) {
    GTEST_SKIP();
  }

  // Load DDLs
  std::optional<std::string> maybe_parent_1_ddl =
      quirks()->PrimaryKeyTableDdl("adbc_fkey_parent_1_test");
  if (!maybe_parent_1_ddl.has_value()) {
    GTEST_SKIP();
  }

  std::string parent_1_ddl = std::move(*maybe_parent_1_ddl);

  std::optional<std::string> maybe_parent_2_ddl =
      quirks()->CompositePrimaryKeyTableDdl("adbc_fkey_parent_2_test");
  if (!maybe_parent_2_ddl.has_value()) {
    GTEST_SKIP();
  }
  std::string parent_2_ddl = std::move(*maybe_parent_2_ddl);

  std::optional<std::string> maybe_child_ddl = quirks()->ForeignKeyChildTableDdl(
      "adbc_fkey_child_test", "adbc_fkey_parent_1_test", "adbc_fkey_parent_2_test");
  if (!maybe_child_ddl.has_value()) {
    GTEST_SKIP();
  }
  std::string child_ddl = std::move(*maybe_child_ddl);

  // Empty database
  // First drop the child table, since the parent tables depends on it
  ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_child_test", &error),
              IsOkStatus(&error));
  ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_parent_1_test", &error),
              IsOkStatus(&error));
  ASSERT_THAT(quirks()->DropTable(&connection, "adbc_fkey_parent_2_test", &error),
              IsOkStatus(&error));

  // Populate database
  {
    Handle<AdbcStatement> statements[3];
    std::string ddls[3] = {parent_1_ddl, parent_2_ddl, child_ddl};
    int64_t rows_affected;

    for (int ddl_index = 0; ddl_index < 3; ddl_index++) {
      rows_affected = 0;
      ASSERT_THAT(AdbcStatementNew(&connection, &statements[ddl_index].value, &error),
                  IsOkStatus(&error));
      ASSERT_THAT(AdbcStatementSetSqlQuery(&statements[ddl_index].value,
                                           ddls[ddl_index].c_str(), &error),
                  IsOkStatus(&error));
      ASSERT_THAT(AdbcStatementExecuteQuery(&statements[ddl_index].value, nullptr,
                                            &rows_affected, &error),
                  IsOkStatus(&error));
    }
  }

  adbc_validation::StreamReader reader;
  ASSERT_THAT(
      AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr,
                               nullptr, nullptr, nullptr, &reader.stream.value, &error),
      IsOkStatus(&error));
  ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
  ASSERT_NO_FATAL_FAILURE(reader.Next());
  ASSERT_NE(nullptr, reader.array->release);
  ASSERT_GT(reader.array->length, 0);

  auto get_objects_data = adbc_validation::GetObjectsReader{&reader.array_view.value};
  ASSERT_NE(*get_objects_data, nullptr)
      << "could not initialize the AdbcGetObjectsData object";

  // Test child table
  struct AdbcGetObjectsTable* child_table = AdbcGetObjectsDataGetTableByName(
      *get_objects_data, quirks()->catalog().c_str(), quirks()->db_schema().c_str(),
      "adbc_fkey_child_test");
  ASSERT_NE(child_table, nullptr) << "could not find adbc_fkey_child_test table";

  // The child table has three columns: id_child_col1, id_child_col2, id_child_col3
  ASSERT_EQ(child_table->n_table_columns, 3);

  const char* child_column_names[3] = {"id_child_col1", "id_child_col2", "id_child_col3"};
  struct AdbcGetObjectsColumn* child_column;
  for (int column_index = 0; column_index < 2; column_index++) {
    child_column = AdbcGetObjectsDataGetColumnByName(
        *get_objects_data, quirks()->catalog().c_str(), quirks()->db_schema().c_str(),
        "adbc_fkey_child_test", child_column_names[column_index]);
    ASSERT_NE(child_column, nullptr)
        << "could not find column " << child_column_names[column_index]
        << " on adbc_fkey_child_test table";
  }

  // There are three constraints: PRIMARY KEY, FOREIGN KEY, FOREIGN KEY
  // affecting one, one, and two columns, respetively
  ASSERT_EQ(child_table->n_table_constraints, 3)
      << "expected 3 constraint on adbc_fkey_child_test table, found: "
      << child_table->n_table_constraints;

  struct ConstraintFlags {
    bool adbc_fkey_child_test_pkey = false;
    bool adbc_fkey_child_test_id_child_col3_fkey = false;
    bool adbc_fkey_child_test_id_child_col1_id_child_col2_fkey = false;
  };
  ConstraintFlags TestedConstraints;

  for (int constraint_index = 0; constraint_index < 3; constraint_index++) {
    struct AdbcGetObjectsConstraint* child_constraint =
        child_table->table_constraints[constraint_index];
    int numbern_of_column_usages = child_constraint->n_column_usages;

    // The number of column usages identifies the constraint
    switch (numbern_of_column_usages) {
      case 0: {
        // adbc_fkey_child_test_pkey
        ConstraintTest(child_constraint, "PRIMARY KEY", {"id_child_col1"});

        TestedConstraints.adbc_fkey_child_test_pkey = true;
      } break;
      case 1: {
        // adbc_fkey_child_test_id_child_col3_fkey
        ConstraintTest(child_constraint, "FOREIGN KEY", {"id_child_col3"});
        ForeignKeyColumnUsagesTest(child_constraint, quirks()->catalog(),
                                   quirks()->db_schema(), 0, "adbc_fkey_parent_1_test",
                                   "id");

        TestedConstraints.adbc_fkey_child_test_id_child_col3_fkey = true;
      } break;
      case 2: {
        // adbc_fkey_child_test_id_child_col1_id_child_col2_fkey
        ConstraintTest(child_constraint, "FOREIGN KEY",
                       {"id_child_col1", "id_child_col2"});
        ForeignKeyColumnUsagesTest(child_constraint, quirks()->catalog(),
                                   quirks()->db_schema(), 0, "adbc_fkey_parent_2_test",
                                   "id_primary_col1");
        ForeignKeyColumnUsagesTest(child_constraint, quirks()->catalog(),
                                   quirks()->db_schema(), 1, "adbc_fkey_parent_2_test",
                                   "id_primary_col2");

        TestedConstraints.adbc_fkey_child_test_id_child_col1_id_child_col2_fkey = true;
      } break;
    }
  }

  ASSERT_TRUE(TestedConstraints.adbc_fkey_child_test_pkey);
  ASSERT_TRUE(TestedConstraints.adbc_fkey_child_test_id_child_col3_fkey);
  ASSERT_TRUE(TestedConstraints.adbc_fkey_child_test_id_child_col1_id_child_col2_fkey);
}