void StatementTest::TestTransactions()

in c/validation/adbc_validation.cc [2154:2244]


void StatementTest::TestTransactions() {
  if (!quirks()->supports_transactions() || quirks()->ddl_implicit_commit_txn()) {
    GTEST_SKIP();
  }

  ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
              IsOkStatus(&error));

  Handle<struct AdbcConnection> connection2;
  ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error));
  ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error),
              IsOkStatus(&error));

  ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
                                      ADBC_OPTION_VALUE_DISABLED, &error),
              IsOkStatus(&error));

  // Uncommitted change
  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));

  // Query on first connection should succeed
  {
    Handle<struct AdbcStatement> statement;
    StreamReader reader;

    ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
                IsOkStatus(&error));
    ASSERT_THAT(
        AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error),
        IsOkStatus(&error));
    ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
                                          &reader.rows_affected, &error),
                IsOkStatus(&error));
    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
  }

  if (error.release) error.release(&error);

  // Query on second connection should fail
  ASSERT_THAT(([&]() -> AdbcStatusCode {
                Handle<struct AdbcStatement> statement;
                StreamReader reader;

                CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error));
                CHECK_OK(AdbcStatementSetSqlQuery(&statement.value,
                                                  "SELECT * FROM bulk_ingest", &error));
                CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
                                                   &reader.rows_affected, &error));
                return ADBC_STATUS_OK;
              })(),
              ::testing::Not(IsOkStatus(&error)));

  if (error.release) error.release(&error);

  // Rollback
  ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));

  // Query on first connection should fail
  ASSERT_THAT(([&]() -> AdbcStatusCode {
                Handle<struct AdbcStatement> statement;
                StreamReader reader;

                CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error));
                CHECK_OK(AdbcStatementSetSqlQuery(&statement.value,
                                                  "SELECT * FROM bulk_ingest", &error));
                CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
                                                   &reader.rows_affected, &error));
                return ADBC_STATUS_OK;
              })(),
              ::testing::Not(IsOkStatus(&error)));

  // Commit
  ASSERT_NO_FATAL_FAILURE(IngestSampleTable(&connection, &error));
  ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error));

  // Query on second connection should succeed
  {
    Handle<struct AdbcStatement> statement;
    StreamReader reader;

    ASSERT_THAT(AdbcStatementNew(&connection2.value, &statement.value, &error),
                IsOkStatus(&error));
    ASSERT_THAT(
        AdbcStatementSetSqlQuery(&statement.value, "SELECT * FROM bulk_ingest", &error),
        IsOkStatus(&error));
    ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
                                          &reader.rows_affected, &error),
                IsOkStatus(&error));
    ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
  }
}