in c/validation/adbc_validation.cc [2154:2244]
void StatementTest::TestTransactions() {
if (!quirks()->supports_transactions() || quirks()->ddl_implicit_commit_txn()) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct AdbcConnection> connection2;
ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
// Uncommitted change
ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
// Query on first connection should succeed
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
}
if (error.release) error.release(&error);
// Query on second connection should fail
ASSERT_THAT(([&]() -> AdbcStatusCode {
Handle<struct AdbcStatement> statement;
StreamReader reader;
CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error));
CHECK_OK(AdbcStatementSetSqlQuery(&statement.value,
"SELECT * FROM bulk_ingest", &error));
CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error));
return ADBC_STATUS_OK;
})(),
::testing::Not(IsOkStatus(&error)));
if (error.release) error.release(&error);
// Rollback
ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));
// Query on first connection should fail
ASSERT_THAT(([&]() -> AdbcStatusCode {
Handle<struct AdbcStatement> statement;
StreamReader reader;
CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error));
CHECK_OK(AdbcStatementSetSqlQuery(&statement.value,
"SELECT * FROM bulk_ingest", &error));
CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error));
return ADBC_STATUS_OK;
})(),
::testing::Not(IsOkStatus(&error)));
// Commit
ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error));
// Query on second connection should succeed
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection2.value, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
}
}