void ConnectionTest::TestMetadataGetObjectsColumns()

in c/validation/adbc_validation_connection.cc [737:860]


void ConnectionTest::TestMetadataGetObjectsColumns() {
  if (!quirks()->supports_get_objects()) {
    GTEST_SKIP();
  }
  // TODO: test could be more robust if we ingested a few tables
  ASSERT_EQ(ADBC_OBJECT_DEPTH_COLUMNS, ADBC_OBJECT_DEPTH_ALL);

  ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
  ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
  ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
              IsOkStatus(&error));

  struct TestCase {
    std::optional<std::string> filter;
    // the pair is column name & ordinal position of the column
    std::vector<std::pair<std::string, int32_t>> columns;
  };

  std::vector<TestCase> test_cases;
  test_cases.push_back({std::nullopt, {{"int64s", 1}, {"strings", 2}}});
  test_cases.push_back({"in%", {{"int64s", 1}}});

  const std::string catalog = quirks()->catalog();

  for (const auto& test_case : test_cases) {
    std::string scope = "Filter: ";
    scope += test_case.filter ? *test_case.filter : "(no filter)";
    SCOPED_TRACE(scope);

    StreamReader reader;
    std::vector<std::pair<std::string, int32_t>> columns;
    std::vector<std::string> column_names;
    std::vector<int32_t> ordinal_positions;

    ASSERT_THAT(
        AdbcConnectionGetObjects(
            &connection, ADBC_OBJECT_DEPTH_COLUMNS, catalog.c_str(), nullptr, nullptr,
            nullptr, test_case.filter.has_value() ? test_case.filter->c_str() : nullptr,
            &reader.stream.value, &error),
        IsOkStatus(&error));
    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
    ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value));
    ASSERT_NO_FATAL_FAILURE(reader.Next());
    ASSERT_NE(nullptr, reader.array->release);
    ASSERT_GT(reader.array->length, 0);
    bool found_expected_table = false;
    do {
      for (int64_t row = 0; row < reader.array->length; row++) {
        // type: list<db_schema_schema>
        struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1];
        // type: db_schema_schema (struct)
        struct ArrowArrayView* catalog_db_schemas = catalog_db_schemas_list->children[0];
        // type: list<table_schema>
        struct ArrowArrayView* db_schema_tables_list = catalog_db_schemas->children[1];
        // type: table_schema (struct)
        struct ArrowArrayView* db_schema_tables = db_schema_tables_list->children[0];
        // type: list<column_schema>
        struct ArrowArrayView* table_columns_list = db_schema_tables->children[2];
        // type: column_schema (struct)
        struct ArrowArrayView* table_columns = table_columns_list->children[0];
        // type: list<usage_schema>
        struct ArrowArrayView* table_constraints_list = db_schema_tables->children[3];

        ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row))
            << "Row " << row << " should have non-null catalog_db_schemas";

        for (int64_t db_schemas_index =
                 ArrowArrayViewListChildOffset(catalog_db_schemas_list, row);
             db_schemas_index <
             ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1);
             db_schemas_index++) {
          ASSERT_FALSE(ArrowArrayViewIsNull(db_schema_tables_list, db_schemas_index))
              << "Row " << row << " should have non-null db_schema_tables";

          ArrowStringView db_schema_name = ArrowArrayViewGetStringUnsafe(
              catalog_db_schemas->children[0], db_schemas_index);

          for (int64_t tables_index =
                   ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index);
               tables_index <
               ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index + 1);
               tables_index++) {
            ArrowStringView table_name = ArrowArrayViewGetStringUnsafe(
                db_schema_tables->children[0], tables_index);

            ASSERT_FALSE(ArrowArrayViewIsNull(table_columns_list, tables_index))
                << "Row " << row << " should have non-null table_columns";
            ASSERT_FALSE(ArrowArrayViewIsNull(table_constraints_list, tables_index))
                << "Row " << row << " should have non-null table_constraints";

            if (iequals(std::string(table_name.data, table_name.size_bytes),
                        "bulk_ingest") &&
                iequals(std::string(db_schema_name.data, db_schema_name.size_bytes),
                        quirks()->db_schema())) {
              ASSERT_FALSE(found_expected_table);
              found_expected_table = true;

              for (int64_t columns_index =
                       ArrowArrayViewListChildOffset(table_columns_list, tables_index);
                   columns_index <
                   ArrowArrayViewListChildOffset(table_columns_list, tables_index + 1);
                   columns_index++) {
                ArrowStringView name = ArrowArrayViewGetStringUnsafe(
                    table_columns->children[0], columns_index);
                std::string temp(name.data, name.size_bytes);
                std::transform(temp.begin(), temp.end(), temp.begin(),
                               [](unsigned char c) { return std::tolower(c); });
                columns.emplace_back(std::move(temp),
                                     static_cast<int32_t>(ArrowArrayViewGetIntUnsafe(
                                         table_columns->children[1], columns_index)));
              }
            }
          }
        }
      }
      ASSERT_NO_FATAL_FAILURE(reader.Next());
    } while (reader.array->release);

    ASSERT_TRUE(found_expected_table) << "Did (not) find table in metadata";
    // metadata columns do not guarantee the order they are returned in, just
    // validate all the elements are there.
    ASSERT_THAT(columns, testing::UnorderedElementsAreArray(test_case.columns));
  }
}