c/validation/adbc_validation.cc (1,840 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "adbc_validation.h" #include <algorithm> #include <cerrno> #include <cstring> #include <limits> #include <optional> #include <string> #include <string_view> #include <tuple> #include <utility> #include <vector> #include <adbc.h> #include <gmock/gmock.h> #include <gtest/gtest-matchers.h> #include <gtest/gtest.h> #include <nanoarrow/nanoarrow.h> #include "adbc_validation_util.h" namespace adbc_validation { namespace { /// Nanoarrow helpers #define NULLABLE true #define NOT_NULL false /// Assertion helpers #define CHECK_OK(EXPR) \ do { \ if (auto adbc_status = (EXPR); adbc_status != ADBC_STATUS_OK) { \ return adbc_status; \ } \ } while (false) /// case insensitive string compare bool iequals(std::string_view s1, std::string_view s2) { return std::equal(s1.begin(), s1.end(), s2.begin(), s2.end(), [](unsigned char a, unsigned char b) { return std::tolower(a) == std::tolower(b); }); } } // namespace //------------------------------------------------------------ // DriverQuirks AdbcStatusCode DoIngestSampleTable(struct AdbcConnection* connection, const std::string& name, struct AdbcError* error) { Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; CHECK_OK(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}})); CHECK_OK((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt}, {"foo", std::nullopt, ""}))); Handle<struct AdbcStatement> statement; CHECK_OK(AdbcStatementNew(connection, &statement.value, error)); CHECK_OK(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE, name.c_str(), error)); CHECK_OK(AdbcStatementBind(&statement.value, &array.value, &schema.value, error)); CHECK_OK(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, error)); CHECK_OK(AdbcStatementRelease(&statement.value, error)); return ADBC_STATUS_OK; } void IngestSampleTable(struct AdbcConnection* connection, struct AdbcError* error) { ASSERT_THAT(DoIngestSampleTable(connection, "bulk_ingest", error), IsOkStatus(error)); } AdbcStatusCode DriverQuirks::EnsureSampleTable(struct AdbcConnection* connection, const std::string& name, struct AdbcError* error) const { CHECK_OK(DropTable(connection, name, error)); return CreateSampleTable(connection, name, error); } AdbcStatusCode DriverQuirks::CreateSampleTable(struct AdbcConnection* connection, const std::string& name, struct AdbcError* error) const { if (!supports_bulk_ingest()) { return ADBC_STATUS_NOT_IMPLEMENTED; } return DoIngestSampleTable(connection, name, error); } //------------------------------------------------------------ // Tests of AdbcDatabase void DatabaseTest::SetUpTest() { std::memset(&error, 0, sizeof(error)); std::memset(&database, 0, sizeof(database)); } void DatabaseTest::TearDownTest() { if (database.private_data) { ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error)); } if (error.release) { error.release(&error); } } void DatabaseTest::TestNewInit() { ASSERT_THAT(AdbcDatabaseNew(&database, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->SetupDatabase(&database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcDatabaseInit(&database, &error), IsOkStatus(&error)); ASSERT_NE(nullptr, database.private_data); ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error)); ASSERT_EQ(nullptr, database.private_data); ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); } void DatabaseTest::TestRelease() { ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); ASSERT_THAT(AdbcDatabaseNew(&database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error)); ASSERT_EQ(nullptr, database.private_data); } //------------------------------------------------------------ // Tests of AdbcConnection void ConnectionTest::SetUpTest() { std::memset(&error, 0, sizeof(error)); std::memset(&database, 0, sizeof(database)); std::memset(&connection, 0, sizeof(connection)); ASSERT_THAT(AdbcDatabaseNew(&database, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->SetupDatabase(&database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcDatabaseInit(&database, &error), IsOkStatus(&error)); } void ConnectionTest::TearDownTest() { if (connection.private_data) { ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error)); } ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error)); if (error.release) { error.release(&error); } } void ConnectionTest::TestNewInit() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error)); ASSERT_EQ(NULL, connection.private_data); ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); } void ConnectionTest::TestRelease() { ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error)); ASSERT_EQ(NULL, connection.private_data); // TODO: what should happen if we Release() with open connections? } void ConnectionTest::TestConcurrent() { struct AdbcConnection connection2; memset(&connection2, 0, sizeof(connection2)); ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionNew(&connection2, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection2, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionRelease(&connection2, &error), IsOkStatus(&error)); } //------------------------------------------------------------ // Tests of autocommit (without data) void ConnectionTest::TestAutocommitDefault() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); // Even if not supported, the driver should act as if autocommit is // enabled, and return INVALID_STATE if the client tries to commit // or rollback ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); // Invalid option value ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, "invalid", &error), ::testing::Not(IsOkStatus(&error))); } void ConnectionTest::TestAutocommitToggle() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_transactions()) { GTEST_SKIP(); } // It is OK to enable autocommit when it is already enabled ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_ENABLED, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &error), IsOkStatus(&error)); // It is OK to disable autocommit when it is already enabled ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &error), IsOkStatus(&error)); } //------------------------------------------------------------ // Tests of metadata void ConnectionTest::TestMetadataGetInfo() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_sql_info()) { GTEST_SKIP(); } StreamReader reader; std::vector<uint32_t> info = { ADBC_INFO_DRIVER_NAME, ADBC_INFO_DRIVER_VERSION, ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, }; ASSERT_THAT(AdbcConnectionGetInfo(&connection, info.data(), info.size(), &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema( &reader.schema.value, { {"info_name", NANOARROW_TYPE_UINT32, NOT_NULL}, {"info_value", NANOARROW_TYPE_DENSE_UNION, NULLABLE}, })); ASSERT_NO_FATAL_FAILURE( CompareSchema(reader.schema->children[1], { {"string_value", NANOARROW_TYPE_STRING, NULLABLE}, {"bool_value", NANOARROW_TYPE_BOOL, NULLABLE}, {"int64_value", NANOARROW_TYPE_INT64, NULLABLE}, {"int32_bitmask", NANOARROW_TYPE_INT32, NULLABLE}, {"string_list", NANOARROW_TYPE_LIST, NULLABLE}, {"int32_to_int32_list_map", NANOARROW_TYPE_MAP, NULLABLE}, })); ASSERT_NO_FATAL_FAILURE(CompareSchema(reader.schema->children[1]->children[4], { {"item", NANOARROW_TYPE_STRING, NULLABLE}, })); ASSERT_NO_FATAL_FAILURE(CompareSchema(reader.schema->children[1]->children[5], { {"entries", NANOARROW_TYPE_STRUCT, NOT_NULL}, })); ASSERT_NO_FATAL_FAILURE( CompareSchema(reader.schema->children[1]->children[5]->children[0], { {"key", NANOARROW_TYPE_INT32, NOT_NULL}, {"value", NANOARROW_TYPE_LIST, NULLABLE}, })); ASSERT_NO_FATAL_FAILURE( CompareSchema(reader.schema->children[1]->children[5]->children[0]->children[1], { {"item", NANOARROW_TYPE_INT32, NULLABLE}, })); std::vector<uint32_t> seen; while (true) { ASSERT_NO_FATAL_FAILURE(reader.Next()); if (!reader.array->release) break; for (int64_t row = 0; row < reader.array->length; row++) { ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], row)); const uint32_t code = reader.array_view->children[0]->buffer_views[1].data.as_uint32[row]; seen.push_back(code); switch (code) { case ADBC_INFO_DRIVER_NAME: case ADBC_INFO_DRIVER_VERSION: case ADBC_INFO_VENDOR_NAME: case ADBC_INFO_VENDOR_VERSION: // UTF8 ASSERT_EQ(uint8_t(0), reader.array_view->children[1]->buffer_views[0].data.as_uint8[row]); default: // Ignored break; } } } ASSERT_THAT(seen, ::testing::UnorderedElementsAreArray(info)); } void ConnectionTest::TestMetadataGetTableSchema() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error)); Handle<ArrowSchema> schema; ASSERT_THAT(AdbcConnectionGetTableSchema(&connection, /*catalog=*/nullptr, /*db_schema=*/nullptr, "bulk_ingest", &schema.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE( CompareSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}, {"strings", NANOARROW_TYPE_STRING, NULLABLE}})); } void ConnectionTest::TestMetadataGetTableTypes() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); StreamReader reader; ASSERT_THAT(AdbcConnectionGetTableTypes(&connection, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema( &reader.schema.value, {{"table_type", NANOARROW_TYPE_STRING, NOT_NULL}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); } void CheckGetObjectsSchema(struct ArrowSchema* schema) { ASSERT_NO_FATAL_FAILURE( CompareSchema(schema, { {"catalog_name", NANOARROW_TYPE_STRING, NULLABLE}, {"catalog_db_schemas", NANOARROW_TYPE_LIST, NULLABLE}, })); struct ArrowSchema* db_schema_schema = schema->children[1]->children[0]; ASSERT_NO_FATAL_FAILURE(CompareSchema( db_schema_schema, { {"db_schema_name", NANOARROW_TYPE_STRING, NULLABLE}, {"db_schema_tables", NANOARROW_TYPE_LIST, NULLABLE}, })); struct ArrowSchema* table_schema = db_schema_schema->children[1]->children[0]; ASSERT_NO_FATAL_FAILURE(CompareSchema( table_schema, { {"table_name", NANOARROW_TYPE_STRING, NOT_NULL}, {"table_type", NANOARROW_TYPE_STRING, NOT_NULL}, {"table_columns", NANOARROW_TYPE_LIST, NULLABLE}, {"table_constraints", NANOARROW_TYPE_LIST, NULLABLE}, })); struct ArrowSchema* column_schema = table_schema->children[2]->children[0]; ASSERT_NO_FATAL_FAILURE(CompareSchema( column_schema, { {"column_name", NANOARROW_TYPE_STRING, NOT_NULL}, {"ordinal_position", NANOARROW_TYPE_INT32, NULLABLE}, {"remarks", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_data_type", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_type_name", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_column_size", NANOARROW_TYPE_INT32, NULLABLE}, {"xdbc_decimal_digits", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_num_prec_radix", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_nullable", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_column_def", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_sql_data_type", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_datetime_sub", NANOARROW_TYPE_INT16, NULLABLE}, {"xdbc_char_octet_length", NANOARROW_TYPE_INT32, NULLABLE}, {"xdbc_is_nullable", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_scope_catalog", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_scope_schema", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_scope_table", NANOARROW_TYPE_STRING, NULLABLE}, {"xdbc_is_autoincrement", NANOARROW_TYPE_BOOL, NULLABLE}, {"xdbc_is_generatedcolumn", NANOARROW_TYPE_BOOL, NULLABLE}, })); struct ArrowSchema* constraint_schema = table_schema->children[3]->children[0]; ASSERT_NO_FATAL_FAILURE(CompareSchema( constraint_schema, { {"constraint_name", NANOARROW_TYPE_STRING, NULLABLE}, {"constraint_type", NANOARROW_TYPE_STRING, NOT_NULL}, {"constraint_column_names", NANOARROW_TYPE_LIST, NOT_NULL}, {"constraint_column_usage", NANOARROW_TYPE_LIST, NULLABLE}, })); ASSERT_NO_FATAL_FAILURE(CompareSchema( constraint_schema->children[2], { {std::nullopt, NANOARROW_TYPE_STRING, NULLABLE}, })); struct ArrowSchema* usage_schema = constraint_schema->children[3]->children[0]; ASSERT_NO_FATAL_FAILURE( CompareSchema(usage_schema, { {"fk_catalog", NANOARROW_TYPE_STRING, NULLABLE}, {"fk_db_schema", NANOARROW_TYPE_STRING, NULLABLE}, {"fk_table", NANOARROW_TYPE_STRING, NOT_NULL}, {"fk_column_name", NANOARROW_TYPE_STRING, NOT_NULL}, })); } void ConnectionTest::TestMetadataGetObjectsCatalogs() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } { StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, nullptr, nullptr, nullptr, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); // We requested catalogs, so expect at least one catalog, and // 'catalog_db_schemas' should be null ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); do { for (int64_t row = 0; row < reader.array->length; row++) { ASSERT_TRUE(ArrowArrayViewIsNull(reader.array_view->children[1], row)) << "Row " << row << " should have null catalog_db_schemas"; } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); } { // Filter with a nonexistent catalog - we should get nothing StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_CATALOGS, "this catalog does not exist", nullptr, nullptr, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); if (reader.array->release) { ASSERT_EQ(0, reader.array->length); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } } } void ConnectionTest::TestMetadataGetObjectsDbSchemas() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } { // Expect at least one catalog, at least one schema, and tables should be null StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr, nullptr, nullptr, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); do { for (int64_t row = 0; row < reader.array->length; row++) { // type: list<db_schema_schema> struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1]; // type: db_schema_schema (struct) struct ArrowArrayView* catalog_db_schemas = catalog_db_schemas_list->children[0]; // type: list<table_schema> struct ArrowArrayView* db_schema_tables_list = catalog_db_schemas->children[1]; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)) << "Row " << row << " should have non-null catalog_db_schemas"; ArrowStringView catalog_name = ArrowArrayViewGetStringUnsafe(reader.array_view->children[0], row); const int64_t start_offset = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row); const int64_t end_offset = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); ASSERT_GE(end_offset, start_offset) << "Row " << row << " (Catalog " << std::string(catalog_name.data, catalog_name.size_bytes) << ") should have nonempty catalog_db_schemas "; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)); for (int64_t list_index = start_offset; list_index < end_offset; list_index++) { ASSERT_TRUE(ArrowArrayViewIsNull(db_schema_tables_list, row + list_index)) << "Row " << row << " should have null db_schema_tables"; } } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); } { // Filter with a nonexistent DB schema - we should get nothing StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_DB_SCHEMAS, nullptr, "this schema does not exist", nullptr, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); do { for (int64_t row = 0; row < reader.array->length; row++) { struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1]; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)) << "Row " << row << " should have non-null catalog_db_schemas"; const int64_t start_offset = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row); const int64_t end_offset = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); ASSERT_EQ(start_offset, end_offset); } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); } } void ConnectionTest::TestMetadataGetObjectsTables() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); std::vector<std::pair<const char*, bool>> test_cases = { {nullptr, true}, {"bulk_%", true}, {"asdf%", false}}; for (const auto& expected : test_cases) { std::string scope = "Filter: "; scope += expected.first ? expected.first : "(no filter)"; scope += "; table should exist? "; scope += expected.second ? "true" : "false"; SCOPED_TRACE(scope); StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr, expected.first, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); bool found_expected_table = false; do { for (int64_t row = 0; row < reader.array->length; row++) { // type: list<db_schema_schema> struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1]; // type: db_schema_schema (struct) struct ArrowArrayView* catalog_db_schemas = catalog_db_schemas_list->children[0]; // type: list<table_schema> struct ArrowArrayView* db_schema_tables_list = catalog_db_schemas->children[1]; // type: table_schema (struct) struct ArrowArrayView* db_schema_tables = db_schema_tables_list->children[0]; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)) << "Row " << row << " should have non-null catalog_db_schemas"; for (int64_t db_schemas_index = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row); db_schemas_index < ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); db_schemas_index++) { ASSERT_FALSE(ArrowArrayViewIsNull(db_schema_tables_list, db_schemas_index)) << "Row " << row << " should have non-null db_schema_tables"; for (int64_t tables_index = ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index); tables_index < ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index + 1); tables_index++) { ArrowStringView table_name = ArrowArrayViewGetStringUnsafe( db_schema_tables->children[0], tables_index); if (iequals(std::string(table_name.data, table_name.size_bytes), "bulk_ingest")) { found_expected_table = true; } ASSERT_TRUE(ArrowArrayViewIsNull(db_schema_tables->children[2], tables_index)) << "Row " << row << " should have null table_columns"; ASSERT_TRUE(ArrowArrayViewIsNull(db_schema_tables->children[3], tables_index)) << "Row " << row << " should have null table_constraints"; } } } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); ASSERT_EQ(expected.second, found_expected_table) << "Did (not) find table in metadata"; } } void ConnectionTest::TestMetadataGetObjectsTablesTypes() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); std::vector<const char*> table_types(2); table_types[0] = "this_table_type_does_not_exist"; table_types[1] = nullptr; { StreamReader reader; ASSERT_THAT(AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_TABLES, nullptr, nullptr, nullptr, table_types.data(), nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); bool found_expected_table = false; do { for (int64_t row = 0; row < reader.array->length; row++) { // type: list<db_schema_schema> struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1]; // type: db_schema_schema (struct) struct ArrowArrayView* catalog_db_schemas = catalog_db_schemas_list->children[0]; // type: list<table_schema> struct ArrowArrayView* db_schema_tables_list = catalog_db_schemas->children[1]; // type: table_schema (struct) struct ArrowArrayView* db_schema_tables = db_schema_tables_list->children[0]; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)) << "Row " << row << " should have non-null catalog_db_schemas"; for (int64_t db_schemas_index = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row); db_schemas_index < ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); db_schemas_index++) { ASSERT_FALSE(ArrowArrayViewIsNull(db_schema_tables_list, db_schemas_index)) << "Row " << row << " should have non-null db_schema_tables"; for (int64_t tables_index = ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index); tables_index < ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index + 1); tables_index++) { ArrowStringView table_name = ArrowArrayViewGetStringUnsafe( db_schema_tables->children[0], tables_index); if (std::string_view(table_name.data, table_name.size_bytes) == "bulk_ingest") { found_expected_table = true; } ASSERT_TRUE(ArrowArrayViewIsNull(db_schema_tables->children[2], tables_index)) << "Row " << row << " should have null table_columns"; ASSERT_TRUE(ArrowArrayViewIsNull(db_schema_tables->children[3], tables_index)) << "Row " << row << " should have null table_constraints"; } } } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); ASSERT_FALSE(found_expected_table) << "Should not find table in metadata"; } } void ConnectionTest::TestMetadataGetObjectsColumns() { if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } // TODO: test could be more robust if we ingested a few tables ASSERT_EQ(ADBC_OBJECT_DEPTH_COLUMNS, ADBC_OBJECT_DEPTH_ALL); ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); struct TestCase { std::optional<std::string> filter; std::vector<std::string> column_names; std::vector<int32_t> ordinal_positions; }; std::vector<TestCase> test_cases; test_cases.push_back({std::nullopt, {"int64s", "strings"}, {1, 2}}); test_cases.push_back({"in%", {"int64s"}, {1}}); for (const auto& test_case : test_cases) { std::string scope = "Filter: "; scope += test_case.filter ? *test_case.filter : "(no filter)"; SCOPED_TRACE(scope); StreamReader reader; std::vector<std::string> column_names; std::vector<int32_t> ordinal_positions; ASSERT_THAT( AdbcConnectionGetObjects( &connection, ADBC_OBJECT_DEPTH_COLUMNS, nullptr, nullptr, nullptr, nullptr, test_case.filter.has_value() ? test_case.filter->c_str() : nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CheckGetObjectsSchema(&reader.schema.value)); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); bool found_expected_table = false; do { for (int64_t row = 0; row < reader.array->length; row++) { // type: list<db_schema_schema> struct ArrowArrayView* catalog_db_schemas_list = reader.array_view->children[1]; // type: db_schema_schema (struct) struct ArrowArrayView* catalog_db_schemas = catalog_db_schemas_list->children[0]; // type: list<table_schema> struct ArrowArrayView* db_schema_tables_list = catalog_db_schemas->children[1]; // type: table_schema (struct) struct ArrowArrayView* db_schema_tables = db_schema_tables_list->children[0]; // type: list<column_schema> struct ArrowArrayView* table_columns_list = db_schema_tables->children[2]; // type: column_schema (struct) struct ArrowArrayView* table_columns = table_columns_list->children[0]; // type: list<usage_schema> struct ArrowArrayView* table_constraints_list = db_schema_tables->children[3]; ASSERT_FALSE(ArrowArrayViewIsNull(catalog_db_schemas_list, row)) << "Row " << row << " should have non-null catalog_db_schemas"; for (int64_t db_schemas_index = ArrowArrayViewListChildOffset(catalog_db_schemas_list, row); db_schemas_index < ArrowArrayViewListChildOffset(catalog_db_schemas_list, row + 1); db_schemas_index++) { ASSERT_FALSE(ArrowArrayViewIsNull(db_schema_tables_list, db_schemas_index)) << "Row " << row << " should have non-null db_schema_tables"; ArrowStringView db_schema_name = ArrowArrayViewGetStringUnsafe( catalog_db_schemas->children[0], db_schemas_index); for (int64_t tables_index = ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index); tables_index < ArrowArrayViewListChildOffset(db_schema_tables_list, db_schemas_index + 1); tables_index++) { ArrowStringView table_name = ArrowArrayViewGetStringUnsafe( db_schema_tables->children[0], tables_index); ASSERT_FALSE(ArrowArrayViewIsNull(table_columns_list, tables_index)) << "Row " << row << " should have non-null table_columns"; ASSERT_FALSE(ArrowArrayViewIsNull(table_constraints_list, tables_index)) << "Row " << row << " should have non-null table_constraints"; if (iequals(std::string(table_name.data, table_name.size_bytes), "bulk_ingest") && iequals(std::string(db_schema_name.data, db_schema_name.size_bytes), quirks()->db_schema())) { found_expected_table = true; for (int64_t columns_index = ArrowArrayViewListChildOffset(table_columns_list, tables_index); columns_index < ArrowArrayViewListChildOffset(table_columns_list, tables_index + 1); columns_index++) { ArrowStringView name = ArrowArrayViewGetStringUnsafe( table_columns->children[0], columns_index); std::string temp(name.data, name.size_bytes); std::transform(temp.begin(), temp.end(), temp.begin(), [](unsigned char c) { return std::tolower(c); }); column_names.push_back(std::move(temp)); ordinal_positions.push_back( static_cast<int32_t>(ArrowArrayViewGetIntUnsafe( table_columns->children[1], columns_index))); } } } } } ASSERT_NO_FATAL_FAILURE(reader.Next()); } while (reader.array->release); ASSERT_TRUE(found_expected_table) << "Did (not) find table in metadata"; ASSERT_EQ(test_case.column_names, column_names); ASSERT_EQ(test_case.ordinal_positions, ordinal_positions); } } void ConnectionTest::TestMetadataGetObjectsConstraints() { // TODO: can't be done portably (need to create tables with primary keys and such) } void ConnectionTest::TestMetadataGetObjectsPrimaryKey() { ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); if (!quirks()->supports_get_objects()) { GTEST_SKIP(); } std::optional<std::string> maybe_ddl = quirks()->PrimaryKeyTableDdl("adbc_pkey_test"); if (!maybe_ddl.has_value()) { GTEST_SKIP(); } std::string ddl = std::move(*maybe_ddl); ASSERT_THAT(quirks()->DropTable(&connection, "adbc_pkey_test", &error), IsOkStatus(&error)); { Handle<AdbcStatement> statement; ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value, ddl.c_str(), &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT( AdbcStatementExecuteQuery(&statement.value, nullptr, &rows_affected, &error), IsOkStatus(&error)); } adbc_validation::StreamReader reader; ASSERT_THAT( AdbcConnectionGetObjects(&connection, ADBC_OBJECT_DEPTH_ALL, nullptr, nullptr, nullptr, nullptr, nullptr, &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_GT(reader.array->length, 0); auto get_objects_data = adbc_validation::GetObjectsReader{&reader.array_view.value}; ASSERT_NE(*get_objects_data, nullptr) << "could not initialize the AdbcGetObjectsData object"; struct AdbcGetObjectsTable* table = AdbcGetObjectsDataGetTableByName(*get_objects_data, quirks()->catalog().c_str(), quirks()->db_schema().c_str(), "adbc_pkey_test"); ASSERT_NE(table, nullptr) << "could not find adbc_pkey_test table"; ASSERT_EQ(table->n_table_columns, 1); struct AdbcGetObjectsColumn* column = AdbcGetObjectsDataGetColumnByName( *get_objects_data, quirks()->catalog().c_str(), quirks()->db_schema().c_str(), "adbc_pkey_test", "id"); ASSERT_NE(column, nullptr) << "could not find id column on adbc_pkey_test table"; ASSERT_EQ(table->n_table_constraints, 1) << "expected 1 constraint on adbc_pkey_test table, found: " << table->n_table_constraints; struct AdbcGetObjectsConstraint* constraint = table->table_constraints[0]; std::string_view constraint_type(constraint->constraint_type.data, constraint->constraint_type.size_bytes); ASSERT_EQ(constraint_type, "PRIMARY KEY"); ASSERT_EQ(constraint->n_column_names, 1) << "expected constraint adbc_pkey_test_pkey to be applied to 1 column, found: " << constraint->n_column_names; std::string_view constraint_column_name( constraint->constraint_column_names[0].data, constraint->constraint_column_names[0].size_bytes); ASSERT_EQ(constraint_column_name, "id"); } //------------------------------------------------------------ // Tests of AdbcStatement void StatementTest::SetUpTest() { std::memset(&error, 0, sizeof(error)); std::memset(&database, 0, sizeof(database)); std::memset(&connection, 0, sizeof(connection)); std::memset(&statement, 0, sizeof(statement)); ASSERT_THAT(AdbcDatabaseNew(&database, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->SetupDatabase(&database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcDatabaseInit(&database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error)); } void StatementTest::TearDownTest() { if (statement.private_data) { EXPECT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } EXPECT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error)); EXPECT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error)); if (error.release) { error.release(&error); } } void StatementTest::TestNewInit() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); ASSERT_EQ(NULL, statement.private_data); ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); // Cannot execute ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); } void StatementTest::TestRelease() { ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); ASSERT_EQ(NULL, statement.private_data); } template <typename CType> void StatementTest::TestSqlIngestType(ArrowType type, const std::vector<std::optional<CType>>& values) { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(MakeSchema(&schema.value, {{"col", type}}), IsOkErrno()); ASSERT_THAT(MakeBatch<CType>(&schema.value, &array.value, &na_error, values), IsOkErrno()); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_THAT(AdbcStatementSetSqlQuery( &statement, "SELECT * FROM bulk_ingest ORDER BY \"col\" ASC NULLS FIRST", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type); ASSERT_NO_FATAL_FAILURE( CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(values.size(), reader.array->length); ASSERT_EQ(1, reader.array->n_children); if (round_trip_type == type) { // XXX: for now we can't compare values; we would need casting ASSERT_NO_FATAL_FAILURE( CompareArray<CType>(reader.array_view->children[0], values)); } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } template <typename CType> void StatementTest::TestSqlIngestNumericType(ArrowType type) { std::vector<std::optional<CType>> values = { std::nullopt, }; if constexpr (std::is_floating_point_v<CType>) { // XXX: sqlite and others seem to have trouble with extreme // values. Likely a bug on our side, but for now, avoid them. values.push_back(static_cast<CType>(-1.5)); values.push_back(static_cast<CType>(1.5)); } else if (type == ArrowType::NANOARROW_TYPE_DATE32) { // Windows does not seem to support negative date values values.push_back(static_cast<CType>(0)); values.push_back(static_cast<CType>(42)); } else { values.push_back(std::numeric_limits<CType>::lowest()); values.push_back(std::numeric_limits<CType>::max()); } return TestSqlIngestType(type, values); } void StatementTest::TestSqlIngestUInt8() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint8_t>(NANOARROW_TYPE_UINT8)); } void StatementTest::TestSqlIngestUInt16() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint16_t>(NANOARROW_TYPE_UINT16)); } void StatementTest::TestSqlIngestUInt32() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint32_t>(NANOARROW_TYPE_UINT32)); } void StatementTest::TestSqlIngestUInt64() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint64_t>(NANOARROW_TYPE_UINT64)); } void StatementTest::TestSqlIngestInt8() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int8_t>(NANOARROW_TYPE_INT8)); } void StatementTest::TestSqlIngestInt16() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int16_t>(NANOARROW_TYPE_INT16)); } void StatementTest::TestSqlIngestInt32() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int32_t>(NANOARROW_TYPE_INT32)); } void StatementTest::TestSqlIngestInt64() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int64_t>(NANOARROW_TYPE_INT64)); } void StatementTest::TestSqlIngestFloat32() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<float>(NANOARROW_TYPE_FLOAT)); } void StatementTest::TestSqlIngestFloat64() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<double>(NANOARROW_TYPE_DOUBLE)); } void StatementTest::TestSqlIngestString() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>( NANOARROW_TYPE_STRING, {std::nullopt, "", "", "1234", "δΎ‹"})); } void StatementTest::TestSqlIngestBinary() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>( NANOARROW_TYPE_BINARY, {std::nullopt, "", "\x00\x01\x02\x04", "\xFE\xFF"})); } void StatementTest::TestSqlIngestDate32() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int32_t>(NANOARROW_TYPE_DATE32)); } template <enum ArrowTimeUnit TU> void StatementTest::TestSqlIngestTimestampType(const char* timezone) { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; const std::vector<std::optional<int64_t>> values = {std::nullopt, -42, 0, 42}; const ArrowType type = NANOARROW_TYPE_TIMESTAMP; // much of this code is shared with TestSqlIngestType with minor // changes to allow for various time units to be tested ArrowSchemaInit(&schema.value); ArrowSchemaSetTypeStruct(&schema.value, 1); ArrowSchemaSetTypeDateTime(schema->children[0], type, TU, timezone); ArrowSchemaSetName(schema->children[0], "col"); ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values), IsOkErrno()); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_THAT(AdbcStatementSetSqlQuery( &statement, "SELECT * FROM bulk_ingest ORDER BY \"col\" ASC NULLS FIRST", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type); ASSERT_NO_FATAL_FAILURE( CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(values.size(), reader.array->length); ASSERT_EQ(1, reader.array->n_children); ValidateIngestedTimestampData(reader.array_view->children[0], TU, timezone); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::ValidateIngestedTimestampData(struct ArrowArrayView* values, enum ArrowTimeUnit unit, const char* timezone) { FAIL() << "ValidateIngestedTimestampData is not implemented in the base class"; } void StatementTest::TestSqlIngestTimestamp() { ASSERT_NO_FATAL_FAILURE( TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_SECOND>(nullptr)); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MILLI>(nullptr)); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MICRO>(nullptr)); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_NANO>(nullptr)); } void StatementTest::TestSqlIngestTimestampTz() { ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_SECOND>("UTC")); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MILLI>("UTC")); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MICRO>("UTC")); ASSERT_NO_FATAL_FAILURE(TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_NANO>("UTC")); ASSERT_NO_FATAL_FAILURE( TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_SECOND>("America/Los_Angeles")); ASSERT_NO_FATAL_FAILURE( TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MILLI>("America/Los_Angeles")); ASSERT_NO_FATAL_FAILURE( TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_MICRO>("America/Los_Angeles")); ASSERT_NO_FATAL_FAILURE( TestSqlIngestTimestampType<NANOARROW_TIME_UNIT_NANO>("America/Los_Angeles")); } void StatementTest::TestSqlIngestInterval() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; const enum ArrowType type = NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO; // values are days, months, ns struct ArrowInterval neg_interval; struct ArrowInterval zero_interval; struct ArrowInterval pos_interval; ArrowIntervalInit(&neg_interval, type); ArrowIntervalInit(&zero_interval, type); ArrowIntervalInit(&pos_interval, type); neg_interval.months = -5; neg_interval.days = -5; neg_interval.ns = -42000; pos_interval.months = 5; pos_interval.days = 5; pos_interval.ns = 42000; const std::vector<std::optional<ArrowInterval*>> values = { std::nullopt, &neg_interval, &zero_interval, &pos_interval}; ASSERT_THAT(MakeSchema(&schema.value, {{"col", type}}), IsOkErrno()); ASSERT_THAT(MakeBatch<ArrowInterval*>(&schema.value, &array.value, &na_error, values), IsOkErrno()); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_THAT(AdbcStatementSetSqlQuery( &statement, "SELECT * FROM bulk_ingest ORDER BY \"col\" ASC NULLS FIRST", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type); ASSERT_NO_FATAL_FAILURE( CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(values.size(), reader.array->length); ASSERT_EQ(1, reader.array->n_children); if (round_trip_type == type) { ASSERT_NO_FATAL_FAILURE( CompareArray<ArrowInterval*>(reader.array_view->children[0], values)); } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::TestSqlIngestTableEscaping() { std::string name = "create_table_escaping"; ASSERT_THAT(quirks()->DropTable(&connection, name, &error), adbc_validation::IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(MakeSchema(&schema.value, {{"index", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt})), IsOkErrno()); Handle<struct AdbcStatement> statement; ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), IsOkErrno()); ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE, name.c_str(), &error), IsOkErrno()); ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error), IsOkErrno()); ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error), IsOkErrno()); ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkErrno()); } void StatementTest::TestSqlIngestAppend() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } // Ingest ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}), IsOkErrno()); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); // Now append // Re-initialize since Bind() should take ownership of data ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT( MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}), IsOkErrno()); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, ADBC_INGEST_OPTION_MODE_APPEND, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1))); // Read data back ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(3, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_NO_FATAL_FAILURE( CompareArray<int64_t>(reader.array_view->children[0], {42, -42, std::nullopt})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::TestSqlIngestErrors() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } // Ingest without bind ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); if (error.release) error.release(&error); ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); // Append to nonexistent table Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, ADBC_INGEST_OPTION_MODE_APPEND, &error), IsOkStatus(&error)); ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT( MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}), IsOkErrno(&na_error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), ::testing::Not(IsOkStatus(&error))); if (error.release) error.release(&error); // Ingest... ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, ADBC_INGEST_OPTION_MODE_CREATE, &error), IsOkStatus(&error)); ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT( MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}), IsOkErrno(&na_error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsOkStatus(&error)); // ...then try to overwrite it ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT( MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}), IsOkErrno(&na_error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), ::testing::Not(IsOkStatus(&error))); if (error.release) error.release(&error); // ...then try to append an incompatible schema ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}, {"coltwo", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT( (MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {}, {})), IsOkErrno(&na_error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE, ADBC_INGEST_OPTION_MODE_APPEND, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), ::testing::Not(IsOkStatus(&error))); } void StatementTest::TestSqlIngestMultipleConnections() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt})), IsOkErrno()); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); int64_t rows_affected = 0; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); { struct AdbcConnection connection2 = {}; ASSERT_THAT(AdbcConnectionNew(&connection2, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection2, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementNew(&connection2, &statement, &error), IsOkStatus(&error)); ASSERT_THAT( AdbcStatementSetSqlQuery( &statement, "SELECT * FROM bulk_ingest ORDER BY \"int64s\" DESC NULLS LAST", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema( &reader.schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(3, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_NO_FATAL_FAILURE( CompareArray<int64_t>(reader.array_view->children[0], {42, -42, std::nullopt})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionRelease(&connection2, &error), IsOkStatus(&error)); } } void StatementTest::TestSqlIngestSample() { if (!quirks()->supports_bulk_ingest()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery( &statement, "SELECT * FROM bulk_ingest ORDER BY int64s ASC NULLS FIRST", &error), IsOkStatus(&error)); StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}, {"strings", NANOARROW_TYPE_STRING, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(3, reader.array->length); ASSERT_EQ(2, reader.array->n_children); ASSERT_NO_FATAL_FAILURE( CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, -42, 42})); ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(reader.array_view->children[1], {"", std::nullopt, "foo"})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } void StatementTest::TestSqlPartitionedInts() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct AdbcPartitions> partitions; int64_t rows_affected = 0; if (!quirks()->supports_partitioned_data()) { ASSERT_THAT(AdbcStatementExecutePartitions(&statement, &schema.value, &partitions.value, &rows_affected, &error), IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error)); GTEST_SKIP(); } ASSERT_THAT(AdbcStatementExecutePartitions(&statement, &schema.value, &partitions.value, &rows_affected, &error), IsOkStatus(&error)); // Assume only 1 partition ASSERT_EQ(1, partitions->num_partitions); ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); // it's allowed for Executepartitions to return a nil schema if one is not available if (schema->release != nullptr) { ASSERT_EQ(1, schema->n_children); } Handle<struct AdbcConnection> connection2; StreamReader reader; ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionReadPartition(&connection2.value, partitions->partitions[0], partitions->partition_lengths[0], &reader.stream.value, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(1, reader.schema->n_children); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(1, reader.array->length); ASSERT_EQ(1, reader.array->n_children); switch (reader.fields[0].type) { case NANOARROW_TYPE_INT32: ASSERT_NO_FATAL_FAILURE( CompareArray<int32_t>(reader.array_view->children[0], {42})); break; case NANOARROW_TYPE_INT64: ASSERT_NO_FATAL_FAILURE( CompareArray<int64_t>(reader.array_view->children[0], {42})); break; default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } void StatementTest::TestSqlPrepareGetParameterSchema() { if (!quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); std::string query = "SELECT "; query += quirks()->BindParameter(0); query += ", "; query += quirks()->BindParameter(1); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; // if schema cannot be determined we should get NOT IMPLEMENTED returned ASSERT_THAT(AdbcStatementGetParameterSchema(&statement, &schema.value, &error), ::testing::AnyOf(IsOkStatus(&error), IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error))); if (schema->release != nullptr) { ASSERT_EQ(2, schema->n_children); } // Can't assume anything about names or types here } void StatementTest::TestSqlPrepareSelectNoParams() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 1", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); if (quirks()->supports_rows_affected()) { ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); } else { ASSERT_THAT(reader.rows_affected, ::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)))); } ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(1, reader.schema->n_children); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(1, reader.array->length); ASSERT_EQ(1, reader.array->n_children); switch (reader.fields[0].type) { case NANOARROW_TYPE_INT32: ASSERT_NO_FATAL_FAILURE(CompareArray<int32_t>(reader.array_view->children[0], {1})); break; case NANOARROW_TYPE_INT64: ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0], {1})); break; default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } void StatementTest::TestSqlPrepareSelectParams() { if (!quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); std::string query = "SELECT "; query += quirks()->BindParameter(0); query += ", "; query += quirks()->BindParameter(1); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}, {"strings", NANOARROW_TYPE_STRING}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt}, {"", std::nullopt, "bar"})), IsOkErrno()); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(2, reader.schema->n_children); const std::vector<std::optional<int32_t>> expected_int32{42, -42, std::nullopt}; const std::vector<std::optional<int64_t>> expected_int64{42, -42, std::nullopt}; const std::vector<std::optional<std::string>> expected_string{"", std::nullopt, "bar"}; int64_t nrows = 0; while (nrows < 3) { ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(2, reader.array->n_children); auto start = nrows; auto end = nrows + reader.array->length; ASSERT_LT(start, expected_int32.size()); ASSERT_LE(end, expected_int32.size()); switch (reader.fields[0].type) { case NANOARROW_TYPE_INT32: ASSERT_NO_FATAL_FAILURE(CompareArray<int32_t>( reader.array_view->children[0], {expected_int32.begin() + start, expected_int32.begin() + end})); break; case NANOARROW_TYPE_INT64: ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>( reader.array_view->children[0], {expected_int64.begin() + start, expected_int64.begin() + end})); break; default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>( reader.array_view->children[1], {expected_string.begin() + start, expected_string.begin() + end})); nrows += reader.array->length; } ASSERT_EQ(3, nrows); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } void StatementTest::TestSqlPrepareUpdate() { if (!quirks()->supports_bulk_ingest() || !quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; // Create table ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt})), IsOkErrno()); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsOkStatus(&error)); // Prepare std::string query = "INSERT INTO bulk_ingest VALUES (" + quirks()->BindParameter(0) + ")"; ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); // Bind and execute ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt})), IsOkErrno()); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsOkStatus(&error)); // Read data back ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(6), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(6, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>( reader.array_view->children[0], {42, -42, std::nullopt, 42, -42, std::nullopt})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } } void StatementTest::TestSqlPrepareUpdateNoParams() { // TODO: prepare something like INSERT 1, then execute it and confirm it's executed once // TODO: then bind a table with 0 cols and X rows and confirm it executes multiple times } void StatementTest::TestSqlPrepareUpdateStream() { if (!quirks()->supports_bulk_ingest() || !quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); struct ArrowError na_error; const std::vector<SchemaField> fields = {{"ints", NANOARROW_TYPE_INT64}}; // Create table { Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {})), IsOkErrno(&na_error)); ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsOkStatus(&error)); } // Generate stream Handle<struct ArrowArrayStream> stream; Handle<struct ArrowSchema> schema; std::vector<struct ArrowArray> batches(2); ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &batches[0], &na_error, {1, 2, std::nullopt, 3})), IsOkErrno(&na_error)); ASSERT_THAT( MakeBatch<int64_t>(&schema.value, &batches[1], &na_error, {std::nullopt, 3}), IsOkErrno(&na_error)); MakeStream(&stream.value, &schema.value, std::move(batches)); // Prepare std::string query = "INSERT INTO bulk_ingest VALUES (" + quirks()->BindParameter(0) + ")"; ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); // Bind and execute ASSERT_THAT(AdbcStatementBindStream(&statement, &stream.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error), IsOkStatus(&error)); // Read data back ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT * FROM bulk_ingest", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(6), ::testing::Eq(-1))); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_NO_FATAL_FAILURE( CompareSchema(&reader.schema.value, {{"ints", NANOARROW_TYPE_INT64, NULLABLE}})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(6, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>( reader.array_view->children[0], {1, 2, std::nullopt, 3, std::nullopt, 3})); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } // TODO: import released stream // TODO: stream that errors on get_schema // TODO: stream that errors on get_next (first call) // TODO: stream that errors on get_next (second call) } void StatementTest::TestSqlPrepareErrorNoQuery() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsStatus(ADBC_STATUS_INVALID_STATE, &error)); if (error.release) error.release(&error); } // TODO: need test of overlapping reads - make sure behavior is as described void StatementTest::TestSqlPrepareErrorParamCountMismatch() { if (!quirks()->supports_dynamic_parameter_binding()) { GTEST_SKIP(); } Handle<struct ArrowSchema> schema; Handle<struct ArrowArray> array; struct ArrowError na_error; StreamReader reader; std::string query = "SELECT "; query += quirks()->BindParameter(0); query += ", "; query += quirks()->BindParameter(1); ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error)); ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno()); ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, -42, std::nullopt})), IsOkErrno()); ASSERT_THAT( ([&]() -> AdbcStatusCode { CHECK_OK(AdbcStatementBind(&statement, &array.value, &schema.value, &error)); CHECK_OK(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error)); return ADBC_STATUS_OK; })(), ::testing::Not(IsOkStatus(&error))); } void StatementTest::TestSqlQueryInts() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); if (quirks()->supports_rows_affected()) { ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); } else { ASSERT_THAT(reader.rows_affected, ::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)))); } ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(1, reader.schema->n_children); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(1, reader.array->length); ASSERT_EQ(1, reader.array->n_children); switch (reader.fields[0].type) { case NANOARROW_TYPE_INT32: ASSERT_NO_FATAL_FAILURE( CompareArray<int32_t>(reader.array_view->children[0], {42})); break; case NANOARROW_TYPE_INT64: ASSERT_NO_FATAL_FAILURE( CompareArray<int64_t>(reader.array_view->children[0], {42})); break; default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::TestSqlQueryFloats() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT CAST(1.5 AS FLOAT)", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); if (quirks()->supports_rows_affected()) { ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); } else { ASSERT_THAT(reader.rows_affected, ::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)))); } ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(1, reader.schema->n_children); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(1, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_FALSE(ArrowArrayViewIsNull(&reader.array_view.value, 0)); ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], 0)); switch (reader.fields[0].type) { case NANOARROW_TYPE_FLOAT: ASSERT_NO_FATAL_FAILURE( CompareArray<float>(reader.array_view->children[0], {1.5f})); break; case NANOARROW_TYPE_DOUBLE: ASSERT_NO_FATAL_FAILURE( CompareArray<double>(reader.array_view->children[0], {1.5})); break; default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::TestSqlQueryStrings() { ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'SaShiSuSeSo'", &error), IsOkStatus(&error)); { StreamReader reader; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); if (quirks()->supports_rows_affected()) { ASSERT_THAT(reader.rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))); } else { ASSERT_THAT(reader.rows_affected, ::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)))); } ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); ASSERT_EQ(1, reader.schema->n_children); ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_NE(nullptr, reader.array->release); ASSERT_EQ(1, reader.array->length); ASSERT_EQ(1, reader.array->n_children); ASSERT_FALSE(ArrowArrayViewIsNull(&reader.array_view.value, 0)); ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], 0)); switch (reader.fields[0].type) { case NANOARROW_TYPE_LARGE_STRING: case NANOARROW_TYPE_STRING: { ASSERT_NO_FATAL_FAILURE( CompareArray<std::string>(reader.array_view->children[0], {"SaShiSuSeSo"})); break; } default: FAIL() << "Unexpected data type: " << reader.fields[0].type; } ASSERT_NO_FATAL_FAILURE(reader.Next()); ASSERT_EQ(nullptr, reader.array->release); } ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error)); } void StatementTest::TestSqlQueryErrors() { // Invalid query ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); AdbcStatusCode code = AdbcStatementSetSqlQuery(&statement, "this is not a query", &error); if (code == ADBC_STATUS_OK) { code = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error); } ASSERT_NE(ADBC_STATUS_OK, code); } void StatementTest::TestTransactions() { if (!quirks()->supports_transactions() || quirks()->ddl_implicit_commit_txn()) { GTEST_SKIP(); } ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error), IsOkStatus(&error)); Handle<struct AdbcConnection> connection2; ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, ADBC_OPTION_VALUE_DISABLED, &error), IsOkStatus(&error)); // Uncommitted change ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error)); // Query on first connection should succeed { Handle<struct AdbcStatement> statement; StreamReader reader; ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error), IsOkStatus(&error)); ASSERT_THAT( AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); } if (error.release) error.release(&error); // Query on second connection should fail ASSERT_THAT(([&]() -> AdbcStatusCode { Handle<struct AdbcStatement> statement; StreamReader reader; CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error)); CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error)); CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value, &reader.rows_affected, &error)); return ADBC_STATUS_OK; })(), ::testing::Not(IsOkStatus(&error))); if (error.release) error.release(&error); // Rollback ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error)); // Query on first connection should fail ASSERT_THAT(([&]() -> AdbcStatusCode { Handle<struct AdbcStatement> statement; StreamReader reader; CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error)); CHECK_OK(AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error)); CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value, &reader.rows_affected, &error)); return ADBC_STATUS_OK; })(), ::testing::Not(IsOkStatus(&error))); // Commit ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error)); ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error)); // Query on second connection should succeed { Handle<struct AdbcStatement> statement; StreamReader reader; ASSERT_THAT(AdbcStatementNew(&connection2.value, &statement.value, &error), IsOkStatus(&error)); ASSERT_THAT( AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value, &reader.rows_affected, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader.GetSchema()); } } void StatementTest::TestConcurrentStatements() { Handle<struct AdbcStatement> statement1; Handle<struct AdbcStatement> statement2; ASSERT_THAT(AdbcStatementNew(&connection, &statement1.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementNew(&connection, &statement2.value, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement1.value, "SELECT 'SaShiSuSeSo'", &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement2.value, "SELECT 'SaShiSuSeSo'", &error), IsOkStatus(&error)); StreamReader reader1; StreamReader reader2; ASSERT_THAT(AdbcStatementExecuteQuery(&statement1.value, &reader1.stream.value, &reader1.rows_affected, &error), IsOkStatus(&error)); if (quirks()->supports_concurrent_statements()) { ASSERT_THAT(AdbcStatementExecuteQuery(&statement2.value, &reader2.stream.value, &reader2.rows_affected, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader2.GetSchema()); } else { ASSERT_THAT(AdbcStatementExecuteQuery(&statement2.value, &reader2.stream.value, &reader2.rows_affected, &error), ::testing::Not(IsOkStatus(&error))); ASSERT_EQ(nullptr, reader2.stream.value.release); } // Original stream should still be valid ASSERT_NO_FATAL_FAILURE(reader1.GetSchema()); } void StatementTest::TestResultInvalidation() { // Start reading from a statement, then overwrite it ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error)); ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error), IsOkStatus(&error)); StreamReader reader1; StreamReader reader2; ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader1.stream.value, &reader1.rows_affected, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader1.GetSchema()); ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader2.stream.value, &reader2.rows_affected, &error), IsOkStatus(&error)); ASSERT_NO_FATAL_FAILURE(reader2.GetSchema()); // First reader should not fail, but may give no data ASSERT_NO_FATAL_FAILURE(reader1.Next()); } #undef NOT_NULL #undef NULLABLE } // namespace adbc_validation