c/validation/adbc_validation_statement.cc (2,343 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "adbc_validation.h"
#include <cstring>
#include <string>
#include <utility>
#include <vector>
#include <arrow-adbc/adbc.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <nanoarrow/nanoarrow.h>
#include <nanoarrow/nanoarrow.hpp>
#include "adbc_validation_util.h"
#include "common/options.h"
namespace adbc_validation {
void StatementTest::SetUpTest() {
std::memset(&error, 0, sizeof(error));
std::memset(&database, 0, sizeof(database));
std::memset(&connection, 0, sizeof(connection));
std::memset(&statement, 0, sizeof(statement));
ASSERT_THAT(AdbcDatabaseNew(&database, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->SetupDatabase(&database, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcDatabaseInit(&database, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionNew(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection, &database, &error), IsOkStatus(&error));
}
void StatementTest::TearDownTest() {
if (statement.private_data) {
EXPECT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
if (connection.private_data) {
EXPECT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
}
if (database.private_data) {
EXPECT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
}
if (error.release) {
error.release(&error);
}
}
void StatementTest::TestNewInit() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
ASSERT_EQ(NULL, statement.private_data);
ASSERT_THAT(AdbcStatementRelease(&statement, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
// Cannot execute
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
}
void StatementTest::TestRelease() {
ASSERT_THAT(AdbcStatementRelease(&statement, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
ASSERT_EQ(NULL, statement.private_data);
}
template <typename CType>
void StatementTest::TestSqlIngestType(SchemaField field,
const std::vector<std::optional<CType>>& values,
bool dictionary_encode) {
// Override the field name
field.name = "col";
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {field}), IsOkErrno());
ASSERT_THAT(MakeBatch<CType>(&schema.value, &array.value, &na_error, values),
IsOkErrno());
if (dictionary_encode) {
// Create a dictionary-encoded version of the target schema
Handle<struct ArrowSchema> dict_schema;
ASSERT_THAT(ArrowSchemaInitFromType(&dict_schema.value, NANOARROW_TYPE_INT32),
IsOkErrno());
ASSERT_THAT(ArrowSchemaSetName(&dict_schema.value, schema.value.children[0]->name),
IsOkErrno());
ASSERT_THAT(ArrowSchemaSetName(schema.value.children[0], nullptr), IsOkErrno());
// Swap it into the target schema
ASSERT_THAT(ArrowSchemaAllocateDictionary(&dict_schema.value), IsOkErrno());
ArrowSchemaMove(schema.value.children[0], dict_schema.value.dictionary);
ArrowSchemaMove(&dict_schema.value, schema.value.children[0]);
// Create a dictionary-encoded array with easy 0...n indices so that the
// matched values will be the same.
Handle<struct ArrowArray> dict_array;
ASSERT_THAT(ArrowArrayInitFromType(&dict_array.value, NANOARROW_TYPE_INT32),
IsOkErrno());
ASSERT_THAT(ArrowArrayStartAppending(&dict_array.value), IsOkErrno());
for (size_t i = 0; i < values.size(); i++) {
ASSERT_THAT(ArrowArrayAppendInt(&dict_array.value, static_cast<int64_t>(i)),
IsOkErrno());
}
ASSERT_THAT(ArrowArrayFinishBuildingDefault(&dict_array.value, nullptr), IsOkErrno());
// Swap it into the target batch
ASSERT_THAT(ArrowArrayAllocateDictionary(&dict_array.value), IsOkErrno());
ArrowArrayMove(array.value.children[0], dict_array.value.dictionary);
ArrowArrayMove(&dict_array.value, array.value.children[0]);
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "SELECT * FROM \"bulk_ingest\" ORDER BY \"col\" ASC NULLS FIRST",
&error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
SchemaField round_trip_field = quirks()->IngestSelectRoundTripType(field);
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value, {round_trip_field}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(values.size(), reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
if (round_trip_field.type == field.type) {
// XXX: for now we can't compare values; we would need casting
ASSERT_NO_FATAL_FAILURE(
CompareArray<CType>(reader.array_view->children[0], values));
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
template <typename CType>
void StatementTest::TestSqlIngestNumericType(ArrowType type) {
std::vector<std::optional<CType>> values = {
std::nullopt,
};
if constexpr (std::is_floating_point_v<CType>) {
// XXX: sqlite and others seem to have trouble with extreme
// values. Likely a bug on our side, but for now, avoid them.
values.push_back(static_cast<CType>(-1.5));
values.push_back(static_cast<CType>(1.5));
} else if (type == ArrowType::NANOARROW_TYPE_DATE32) {
// Windows does not seem to support negative date values
values.push_back(static_cast<CType>(0));
values.push_back(static_cast<CType>(42));
} else {
values.push_back(std::numeric_limits<CType>::lowest());
values.push_back(std::numeric_limits<CType>::max());
}
return TestSqlIngestType(type, values, false);
}
void StatementTest::TestSqlIngestBool() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<bool>(NANOARROW_TYPE_BOOL));
}
void StatementTest::TestSqlIngestUInt8() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint8_t>(NANOARROW_TYPE_UINT8));
}
void StatementTest::TestSqlIngestUInt16() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint16_t>(NANOARROW_TYPE_UINT16));
}
void StatementTest::TestSqlIngestUInt32() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint32_t>(NANOARROW_TYPE_UINT32));
}
void StatementTest::TestSqlIngestUInt64() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<uint64_t>(NANOARROW_TYPE_UINT64));
}
void StatementTest::TestSqlIngestInt8() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int8_t>(NANOARROW_TYPE_INT8));
}
void StatementTest::TestSqlIngestInt16() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int16_t>(NANOARROW_TYPE_INT16));
}
void StatementTest::TestSqlIngestInt32() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int32_t>(NANOARROW_TYPE_INT32));
}
void StatementTest::TestSqlIngestInt64() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int64_t>(NANOARROW_TYPE_INT64));
}
void StatementTest::TestSqlIngestFloat16() {
if (!quirks()->supports_ingest_float16()) {
GTEST_SKIP();
}
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<float>(NANOARROW_TYPE_HALF_FLOAT));
}
void StatementTest::TestSqlIngestFloat32() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<float>(NANOARROW_TYPE_FLOAT));
}
void StatementTest::TestSqlIngestFloat64() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<double>(NANOARROW_TYPE_DOUBLE));
}
void StatementTest::TestSqlIngestString() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
NANOARROW_TYPE_STRING, {std::nullopt, "", "", "1234", "例"}, false));
}
void StatementTest::TestSqlIngestLargeString() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
NANOARROW_TYPE_LARGE_STRING, {std::nullopt, "", "", "1234", "例"}, false));
}
void StatementTest::TestSqlIngestStringView() {
if (!quirks()->supports_ingest_view_types()) {
GTEST_SKIP();
}
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
NANOARROW_TYPE_STRING_VIEW, {std::nullopt, "", "", "longer than 12 bytes", "例"},
false));
}
void StatementTest::TestSqlIngestBinary() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::byte>>(
NANOARROW_TYPE_BINARY,
{std::nullopt, std::vector<std::byte>{},
std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
std::vector<std::byte>{std::byte{0x01}, std::byte{0x02}, std::byte{0x03},
std::byte{0x04}},
std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}},
false));
}
void StatementTest::TestSqlIngestLargeBinary() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::byte>>(
NANOARROW_TYPE_LARGE_BINARY,
{std::nullopt, std::vector<std::byte>{},
std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
std::vector<std::byte>{std::byte{0x01}, std::byte{0x02}, std::byte{0x03},
std::byte{0x04}},
std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}},
false));
}
void StatementTest::TestSqlIngestFixedSizeBinary() {
SchemaField field = SchemaField::FixedSize("col", NANOARROW_TYPE_FIXED_SIZE_BINARY, 4);
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(
field, {std::nullopt, "abcd", "efgh", "ijkl", "mnop"}, false));
}
void StatementTest::TestSqlIngestBinaryView() {
if (!quirks()->supports_ingest_view_types()) {
GTEST_SKIP();
}
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::byte>>(
NANOARROW_TYPE_LARGE_BINARY,
{std::nullopt, std::vector<std::byte>{},
std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}},
std::vector<std::byte>{std::byte{0x01}, std::byte{0x02}, std::byte{0x03},
std::byte{0x04}},
std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}},
false));
}
void StatementTest::TestSqlIngestDate32() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestNumericType<int32_t>(NANOARROW_TYPE_DATE32));
}
template <ArrowType type, enum ArrowTimeUnit TU>
void StatementTest::TestSqlIngestTemporalType(const char* timezone) {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
const std::vector<std::optional<int64_t>> values = {std::nullopt, -42, 0, 42};
// much of this code is shared with TestSqlIngestType with minor
// changes to allow for various time units to be tested
ArrowSchemaInit(&schema.value);
ArrowSchemaSetTypeStruct(&schema.value, 1);
ArrowSchemaSetTypeDateTime(schema->children[0], type, TU, timezone);
ArrowSchemaSetName(schema->children[0], "col");
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "SELECT * FROM \"bulk_ingest\" ORDER BY \"col\" ASC NULLS FIRST",
&error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type);
ASSERT_NO_FATAL_FAILURE(
CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(values.size(), reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ValidateIngestedTemporalData(reader.array_view->children[0], type, TU, timezone);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::ValidateIngestedTemporalData(struct ArrowArrayView* values,
ArrowType type, enum ArrowTimeUnit unit,
const char* timezone) {
FAIL() << "ValidateIngestedTemporalData is not implemented in the base class";
}
void StatementTest::TestSqlIngestDuration() {
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_SECOND>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MILLI>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_MICRO>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_DURATION, NANOARROW_TIME_UNIT_NANO>(
nullptr)));
}
void StatementTest::TestSqlIngestTimestamp() {
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_SECOND>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO>(
nullptr)));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_NANO>(
nullptr)));
}
void StatementTest::TestSqlIngestTimestampTz() {
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_SECOND>(
"UTC")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI>(
"UTC")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO>(
"UTC")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_NANO>(
"UTC")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_SECOND>(
"America/Los_Angeles")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MILLI>(
"America/Los_Angeles")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_MICRO>(
"America/Los_Angeles")));
ASSERT_NO_FATAL_FAILURE(
(TestSqlIngestTemporalType<NANOARROW_TYPE_TIMESTAMP, NANOARROW_TIME_UNIT_NANO>(
"America/Los_Angeles")));
}
void StatementTest::TestSqlIngestInterval() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
const enum ArrowType type = NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO;
// values are days, months, ns
struct ArrowInterval neg_interval;
struct ArrowInterval zero_interval;
struct ArrowInterval pos_interval;
ArrowIntervalInit(&neg_interval, type);
ArrowIntervalInit(&zero_interval, type);
ArrowIntervalInit(&pos_interval, type);
neg_interval.months = -5;
neg_interval.days = -5;
neg_interval.ns = -42000;
pos_interval.months = 5;
pos_interval.days = 5;
pos_interval.ns = 42000;
const std::vector<std::optional<ArrowInterval*>> values = {
std::nullopt, &neg_interval, &zero_interval, &pos_interval};
ASSERT_THAT(MakeSchema(&schema.value, {{"col", type}}), IsOkErrno());
ASSERT_THAT(MakeBatch<ArrowInterval*>(&schema.value, &array.value, &na_error, values),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "SELECT * FROM \"bulk_ingest\" ORDER BY \"col\" ASC NULLS FIRST",
&error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(values.size()), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(type);
ASSERT_NO_FATAL_FAILURE(
CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(values.size(), reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
if (round_trip_type == type) {
ASSERT_NO_FATAL_FAILURE(
CompareArray<ArrowInterval*>(reader.array_view->children[0], values));
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestStringDictionary() {
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::string>(NANOARROW_TYPE_STRING,
{"", "", "1234", "例"},
/*dictionary_encode*/ true));
}
void StatementTest::TestSqlIngestListOfInt32() {
SchemaField field =
SchemaField::Nested("col", NANOARROW_TYPE_LIST, {{"item", NANOARROW_TYPE_INT32}});
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<int32_t>>(
field, {std::nullopt, std::vector<int32_t>{1, 2, 3}, std::vector<int32_t>{4, 5}},
/*dictionary_encode*/ false));
}
void StatementTest::TestSqlIngestListOfString() {
SchemaField field =
SchemaField::Nested("col", NANOARROW_TYPE_LIST, {{"item", NANOARROW_TYPE_STRING}});
ASSERT_NO_FATAL_FAILURE(TestSqlIngestType<std::vector<std::string>>(
field,
{std::nullopt, std::vector<std::string>{"abc", "defg"},
std::vector<std::string>{"hijk"}},
/*dictionary_encode*/ false));
}
void StatementTest::TestSqlIngestStreamZeroArrays() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
ASSERT_THAT(MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT32}}), IsOkErrno());
Handle<struct ArrowArrayStream> bind;
nanoarrow::EmptyArrayStream(&schema.value).ToArrayStream(&bind.value);
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBindStream(&statement, &bind.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(0), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ArrowType round_trip_type = quirks()->IngestSelectRoundTripType(NANOARROW_TYPE_INT32);
ASSERT_NO_FATAL_FAILURE(
CompareSchema(&reader.schema.value, {{"col", round_trip_type, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
}
void StatementTest::TestSqlIngestTableEscaping() {
std::string name = "create_table_escaping";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"index", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestColumnEscaping() {
std::string name = "create";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"index", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestAppend() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND)) {
GTEST_SKIP();
}
// Ingest
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
// Now append
// Re-initialize since Bind() should take ownership of data
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(
MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(3, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {42, -42, std::nullopt}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestReplace() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_REPLACE)) {
GTEST_SKIP();
}
// Ingest
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_REPLACE, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0], {42}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
// Replace
// Re-initialize since Bind() should take ownership of data
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, -42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_REPLACE, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(2, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {-42, -42}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
}
void StatementTest::TestSqlIngestCreateAppend() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE_APPEND)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
// Ingest
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_CREATE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
// Append
// Re-initialize since Bind() should take ownership of data
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {42, 42}),
IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(2), ::testing::Eq(-1)));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(3, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {42, 42, 42}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestErrors() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
// Ingest without bind
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
if (error.release) error.release(&error);
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
// Append to nonexistent table
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(
MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
::testing::Not(IsOkStatus(&error)));
if (error.release) error.release(&error);
// Ingest...
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_CREATE, &error),
IsOkStatus(&error));
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(
MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
// ...then try to overwrite it
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT(
MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {-42, std::nullopt}),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
::testing::Not(IsOkStatus(&error)));
if (error.release) error.release(&error);
if (!quirks()->supports_error_on_incompatible_schema()) {
return;
}
// ...then try to append an incompatible schema
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
{"coltwo", NANOARROW_TYPE_INT64}}),
IsOkErrno());
ASSERT_THAT(
(MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error, {-42}, {-42})),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
::testing::Not(IsOkStatus(&error)));
}
void StatementTest::TestSqlIngestMultipleConnections() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
{
struct AdbcConnection connection2 = {};
ASSERT_THAT(AdbcConnectionNew(&connection2, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2, &database, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection2, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement,
"SELECT * FROM \"bulk_ingest\" ORDER BY \"int64s\" DESC NULLS LAST", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(
&reader.schema.value, {{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(3, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {42, -42, std::nullopt}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionRelease(&connection2, &error), IsOkStatus(&error));
}
}
void StatementTest::TestSqlIngestSample() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->EnsureSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "SELECT * FROM \"bulk_ingest\" ORDER BY int64s ASC NULLS FIRST",
&error),
IsOkStatus(&error));
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE},
{"strings", NANOARROW_TYPE_STRING, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(3, reader.array->length);
ASSERT_EQ(2, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {std::nullopt, -42, 42}));
ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(reader.array_view->children[1],
{"", std::nullopt, "foo"}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
void StatementTest::TestSqlIngestTargetCatalog() {
if (!quirks()->supports_bulk_ingest_catalog() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
std::string catalog = quirks()->catalog();
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_CATALOG,
catalog.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTargetSchema() {
if (!quirks()->supports_bulk_ingest_db_schema() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
std::string db_schema = quirks()->db_schema();
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
db_schema.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTargetCatalogSchema() {
if (!quirks()->supports_bulk_ingest_catalog() ||
!quirks()->supports_bulk_ingest_db_schema() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
std::string catalog = quirks()->catalog();
std::string db_schema = quirks()->db_schema();
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_CATALOG,
catalog.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
db_schema.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTemporary() {
if (!quirks()->supports_bulk_ingest_temporary() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error), IsOkStatus(&error));
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTemporaryAppend() {
// Append to temp table shouldn't affect actual table and vice versa
if (!quirks()->supports_bulk_ingest_temporary() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND)) {
GTEST_SKIP();
}
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error), IsOkStatus(&error));
// Create both tables with different schemas
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<std::string>(&schema.value, &array.value, &na_error,
{"foo", "bar", std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
// Append to the temporary table
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {0, 1, 2})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
// Append to the normal table
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT(
(MakeBatch<std::string>(&schema.value, &array.value, &na_error, {"", "a", "b"})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTemporaryReplace() {
// Replace temp table shouldn't affect actual table and vice versa
if (!quirks()->supports_bulk_ingest_temporary() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_APPEND) ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_REPLACE)) {
GTEST_SKIP();
}
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error), IsOkStatus(&error));
// Create both tables with different schemas
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"strs", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<std::string>(&schema.value, &array.value, &na_error,
{"foo", "bar", std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
// Replace both tables with different schemas
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints2", NANOARROW_TYPE_INT64},
{"strs2", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
{0, 1, std::nullopt},
{"foo", "bar", std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_REPLACE, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints3", NANOARROW_TYPE_INT64}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {1, 2, 3})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_REPLACE, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
// Now append to the replaced tables to check that the schemas are as expected
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints2", NANOARROW_TYPE_INT64},
{"strs2", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
{0, 1, std::nullopt},
{"foo", "bar", std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints3", NANOARROW_TYPE_INT64}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {4, 5, 6})),
IsOkErrno());
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
}
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlIngestTemporaryExclusive() {
// Can't set target schema/catalog with temp table
if (!quirks()->supports_bulk_ingest_temporary() ||
!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE)) {
GTEST_SKIP();
}
std::string name = "bulk_ingest";
ASSERT_THAT(quirks()->DropTempTable(&connection, name, &error), IsOkStatus(&error));
if (quirks()->supports_bulk_ingest_catalog()) {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
std::string catalog = quirks()->catalog();
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_CATALOG,
catalog.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
if (quirks()->supports_bulk_ingest_db_schema()) {
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"ints", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
std::string db_schema = quirks()->db_schema();
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TEMPORARY,
ADBC_OPTION_VALUE_ENABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_DB_SCHEMA,
db_schema.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
}
void StatementTest::TestSqlIngestPrimaryKey() {
std::string name = "pkeytest";
auto ddl = quirks()->PrimaryKeyIngestTableDdl(name);
if (!ddl) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, name, &error), IsOkStatus(&error));
// Create table
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value, ddl->c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
// Ingest without the primary key
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"value", NANOARROW_TYPE_INT64}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
// Ingest with the primary key
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value,
{
{"id", NANOARROW_TYPE_INT64},
{"value", NANOARROW_TYPE_INT64},
}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t, int64_t>(&schema.value, &array.value, &na_error,
{4, 5, 6}, {1, 0, -1})),
IsOkErrno());
Handle<struct AdbcStatement> statement;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_TARGET_TABLE,
name.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetOption(&statement.value, ADBC_INGEST_OPTION_MODE,
ADBC_INGEST_OPTION_MODE_APPEND, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement.value, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementRelease(&statement.value, &error), IsOkStatus(&error));
}
// Get the data
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(
&statement.value, "SELECT * FROM pkeytest ORDER BY id ASC", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value, nullptr,
&error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(2, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(6, reader.array->length);
ASSERT_EQ(2, reader.array->n_children);
// Different databases start numbering at 0 or 1 for the primary key
// column, so can't compare it
// TODO(https://github.com/apache/arrow-adbc/issues/938): if the test
// helpers converted data to plain C++ values we could do a more
// sophisticated assertion
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[1],
{42, -42, std::nullopt, 1, 0, -1}));
}
}
void StatementTest::TestSqlPartitionedInts() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct AdbcPartitions> partitions;
int64_t rows_affected = 0;
if (!quirks()->supports_partitioned_data()) {
ASSERT_THAT(AdbcStatementExecutePartitions(&statement, &schema.value,
&partitions.value, &rows_affected, &error),
IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error));
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementExecutePartitions(&statement, &schema.value, &partitions.value,
&rows_affected, &error),
IsOkStatus(&error));
// Assume only 1 partition
ASSERT_EQ(1, partitions->num_partitions);
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
// it's allowed for Executepartitions to return a nil schema if one is not available
if (schema->release != nullptr) {
ASSERT_EQ(1, schema->n_children);
}
Handle<struct AdbcConnection> connection2;
StreamReader reader;
ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionReadPartition(&connection2.value, partitions->partitions[0],
partitions->partition_lengths[0],
&reader.stream.value, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
switch (reader.fields[0].type) {
case NANOARROW_TYPE_INT32:
ASSERT_NO_FATAL_FAILURE(
CompareArray<int32_t>(reader.array_view->children[0], {42}));
break;
case NANOARROW_TYPE_INT64:
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {42}));
break;
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
void StatementTest::TestSqlPrepareGetParameterSchema() {
if (!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
std::string query = "SELECT ";
query += quirks()->BindParameter(0);
query += ", ";
query += quirks()->BindParameter(1);
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
// if schema cannot be determined we should get NOT IMPLEMENTED returned
ASSERT_THAT(AdbcStatementGetParameterSchema(&statement, &schema.value, &error),
::testing::AnyOf(IsOkStatus(&error),
IsStatus(ADBC_STATUS_NOT_IMPLEMENTED, &error)));
if (schema->release != nullptr) {
ASSERT_EQ(2, schema->n_children);
}
// Can't assume anything about names or types here
}
void StatementTest::TestSqlPrepareSelectNoParams() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 1", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
if (quirks()->supports_rows_affected()) {
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
} else {
ASSERT_THAT(reader.rows_affected,
::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))));
}
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
switch (reader.fields[0].type) {
case NANOARROW_TYPE_INT32:
ASSERT_NO_FATAL_FAILURE(CompareArray<int32_t>(reader.array_view->children[0], {1}));
break;
case NANOARROW_TYPE_INT64:
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(reader.array_view->children[0], {1}));
break;
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
void StatementTest::TestSqlPrepareSelectParams() {
if (!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
std::string query = "SELECT ";
query += quirks()->BindParameter(0);
query += ", ";
query += quirks()->BindParameter(1);
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64},
{"strings", NANOARROW_TYPE_STRING}}),
IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t, std::string>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt},
{"", std::nullopt, "bar"})),
IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(2, reader.schema->n_children);
const std::vector<std::optional<int32_t>> expected_int32{42, -42, std::nullopt};
const std::vector<std::optional<int64_t>> expected_int64{42, -42, std::nullopt};
const std::vector<std::optional<std::string>> expected_string{"", std::nullopt, "bar"};
int64_t nrows = 0;
while (nrows < 3) {
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(2, reader.array->n_children);
auto start = nrows;
auto end = nrows + reader.array->length;
ASSERT_LT(start, expected_int32.size());
ASSERT_LE(end, expected_int32.size());
switch (reader.fields[0].type) {
case NANOARROW_TYPE_INT32:
ASSERT_NO_FATAL_FAILURE(CompareArray<int32_t>(
reader.array_view->children[0],
{expected_int32.begin() + start, expected_int32.begin() + end}));
break;
case NANOARROW_TYPE_INT64:
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(
reader.array_view->children[0],
{expected_int64.begin() + start, expected_int64.begin() + end}));
break;
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(CompareArray<std::string>(
reader.array_view->children[1],
{expected_string.begin() + start, expected_string.begin() + end}));
nrows += reader.array->length;
}
ASSERT_EQ(3, nrows);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
void StatementTest::TestSqlPrepareUpdate() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
// Create table
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
// Prepare
std::string query =
"INSERT INTO \"bulk_ingest\" VALUES (" + quirks()->BindParameter(0) + ")";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
// Bind and execute
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(6), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(CompareSchema(&reader.schema.value,
{{"int64s", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(6, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(
reader.array_view->children[0], {42, -42, std::nullopt, 42, -42, std::nullopt}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
}
void StatementTest::TestSqlPrepareUpdateNoParams() {
// TODO: prepare something like INSERT 1, then execute it and confirm it's executed once
// TODO: then bind a table with 0 cols and X rows and confirm it executes multiple times
}
void StatementTest::TestSqlPrepareUpdateStream() {
if (!quirks()->supports_bulk_ingest(ADBC_INGEST_OPTION_MODE_CREATE) ||
!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
struct ArrowError na_error;
const std::vector<SchemaField> fields = {{"ints", NANOARROW_TYPE_INT64}};
// Create table
{
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
ASSERT_THAT(AdbcStatementSetOption(&statement, ADBC_INGEST_OPTION_TARGET_TABLE,
"bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error, {})),
IsOkErrno(&na_error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
}
// Generate stream
Handle<struct ArrowArrayStream> stream;
Handle<struct ArrowSchema> schema;
std::vector<struct ArrowArray> batches(2);
ASSERT_THAT(MakeSchema(&schema.value, fields), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &batches[0], &na_error,
{1, 2, std::nullopt, 3})),
IsOkErrno(&na_error));
ASSERT_THAT(
MakeBatch<int64_t>(&schema.value, &batches[1], &na_error, {std::nullopt, 3}),
IsOkErrno(&na_error));
MakeStream(&stream.value, &schema.value, std::move(batches));
// Prepare
std::string query =
"INSERT INTO \"bulk_ingest\" VALUES (" + quirks()->BindParameter(0) + ")";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
// Bind and execute
ASSERT_THAT(AdbcStatementBindStream(&statement, &stream.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
// Read data back
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(6), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(
CompareSchema(&reader.schema.value, {{"ints", NANOARROW_TYPE_INT64, NULLABLE}}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(6, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_NO_FATAL_FAILURE(CompareArray<int64_t>(
reader.array_view->children[0], {1, 2, std::nullopt, 3, std::nullopt, 3}));
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
// TODO: import released stream
// TODO: stream that errors on get_schema
// TODO: stream that errors on get_next (first call)
// TODO: stream that errors on get_next (second call)
}
void StatementTest::TestSqlPrepareErrorNoQuery() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
if (error.release) error.release(&error);
}
// TODO: need test of overlapping reads - make sure behavior is as described
void StatementTest::TestSqlPrepareErrorParamCountMismatch() {
if (!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
StreamReader reader;
std::string query = "SELECT ";
query += quirks()->BindParameter(0);
query += ", ";
query += quirks()->BindParameter(1);
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
ASSERT_THAT(MakeSchema(&schema.value, {{"int64s", NANOARROW_TYPE_INT64}}), IsOkErrno());
ASSERT_THAT((MakeBatch<int64_t>(&schema.value, &array.value, &na_error,
{42, -42, std::nullopt})),
IsOkErrno());
ASSERT_THAT(
([&]() -> AdbcStatusCode {
CHECK_OK(AdbcStatementBind(&statement, &array.value, &schema.value, &error));
CHECK_OK(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error));
return ADBC_STATUS_OK;
})(),
::testing::Not(IsOkStatus(&error)));
}
void StatementTest::TestSqlBind() {
if (!quirks()->supports_dynamic_parameter_binding()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "bindtest", &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(
&statement, "CREATE TABLE bindtest (col1 INTEGER, col2 TEXT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
Handle<struct ArrowSchema> schema;
Handle<struct ArrowArray> array;
struct ArrowError na_error;
ASSERT_THAT(MakeSchema(&schema.value,
{{"", NANOARROW_TYPE_INT32}, {"", NANOARROW_TYPE_STRING}}),
IsOkErrno());
std::vector<std::optional<int32_t>> int_values{std::nullopt, -123, 123};
std::vector<std::optional<std::string>> string_values{"abc", std::nullopt, "defg"};
int batch_result = MakeBatch<int32_t, std::string>(
&schema.value, &array.value, &na_error, int_values, string_values);
ASSERT_THAT(batch_result, IsOkErrno());
auto insert_query = std::string("INSERT INTO bindtest VALUES (") +
quirks()->BindParameter(0) + ", " + quirks()->BindParameter(1) +
")";
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, insert_query.c_str(), &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementPrepare(&statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementBind(&statement, &array.value, &schema.value, &error),
IsOkStatus(&error));
int64_t rows_affected = -10;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(-1), ::testing::Eq(3)));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "SELECT * FROM bindtest ORDER BY col1 ASC NULLS FIRST", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(reader.array->length, 3);
CompareArray(reader.array_view->children[0], int_values);
CompareArray(reader.array_view->children[1], string_values);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(reader.array->release, nullptr);
}
}
void StatementTest::TestSqlQueryEmpty() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(quirks()->DropTable(&connection, "queryempty", &error), IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "CREATE TABLE queryempty (FOO INT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM queryempty WHERE 1=0", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(0), ::testing::Eq(-1)));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
while (true) {
ASSERT_NO_FATAL_FAILURE(reader.Next());
if (!reader.array->release) {
break;
}
ASSERT_EQ(0, reader.array->length);
}
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryInts() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
if (quirks()->supports_rows_affected()) {
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
} else {
ASSERT_THAT(reader.rows_affected,
::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))));
}
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
switch (reader.fields[0].type) {
case NANOARROW_TYPE_INT32:
ASSERT_NO_FATAL_FAILURE(
CompareArray<int32_t>(reader.array_view->children[0], {42}));
break;
case NANOARROW_TYPE_INT64:
ASSERT_NO_FATAL_FAILURE(
CompareArray<int64_t>(reader.array_view->children[0], {42}));
break;
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryFloats() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT CAST(1.5 AS FLOAT)", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
if (quirks()->supports_rows_affected()) {
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
} else {
ASSERT_THAT(reader.rows_affected,
::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))));
}
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_FALSE(ArrowArrayViewIsNull(&reader.array_view.value, 0));
ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], 0));
switch (reader.fields[0].type) {
case NANOARROW_TYPE_FLOAT:
ASSERT_NO_FATAL_FAILURE(
CompareArray<float>(reader.array_view->children[0], {1.5f}));
break;
case NANOARROW_TYPE_DOUBLE:
ASSERT_NO_FATAL_FAILURE(
CompareArray<double>(reader.array_view->children[0], {1.5}));
break;
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryStrings() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'SaShiSuSeSo'", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
if (quirks()->supports_rows_affected()) {
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1)));
} else {
ASSERT_THAT(reader.rows_affected,
::testing::Not(::testing::AnyOf(::testing::Eq(1), ::testing::Eq(-1))));
}
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_EQ(1, reader.schema->n_children);
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_NE(nullptr, reader.array->release);
ASSERT_EQ(1, reader.array->length);
ASSERT_EQ(1, reader.array->n_children);
ASSERT_FALSE(ArrowArrayViewIsNull(&reader.array_view.value, 0));
ASSERT_FALSE(ArrowArrayViewIsNull(reader.array_view->children[0], 0));
switch (reader.fields[0].type) {
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_STRING: {
ASSERT_NO_FATAL_FAILURE(
CompareArray<std::string>(reader.array_view->children[0], {"SaShiSuSeSo"}));
break;
}
default:
FAIL() << "Unexpected data type: " << reader.fields[0].type;
}
ASSERT_NO_FATAL_FAILURE(reader.Next());
ASSERT_EQ(nullptr, reader.array->release);
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryInsertRollback() {
if (!quirks()->supports_transactions()) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "rollbacktest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
"CREATE TABLE \"rollbacktest\" (a INT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(
&statement, "INSERT INTO \"rollbacktest\" (a) VALUES (1)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM \"rollbacktest\"", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
int64_t total_rows = 0;
while (true) {
ASSERT_NO_FATAL_FAILURE(reader.Next());
if (!reader.array->release) break;
total_rows += reader.array->length;
}
ASSERT_EQ(0, total_rows);
}
void StatementTest::TestSqlQueryCancel() {
if (!quirks()->supports_cancel()) {
GTEST_SKIP();
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'SaShiSuSeSo'", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
ASSERT_THAT(AdbcStatementCancel(&statement, &error), IsOkStatus(&error));
while (true) {
int err = reader.MaybeNext();
if (err != 0) {
ASSERT_THAT(err, ::testing::AnyOf(0, IsErrno(ECANCELED, &reader.stream.value,
/*ArrowError*/ nullptr)));
}
if (!reader.array->release) break;
}
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryErrors() {
// Invalid query
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
AdbcStatusCode code =
AdbcStatementSetSqlQuery(&statement, "this is not a query", &error);
if (code == ADBC_STATUS_OK) {
code = AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error);
}
ASSERT_NE(ADBC_STATUS_OK, code);
}
void StatementTest::TestSqlQueryTrailingSemicolons() {
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT current_date;;;", &error),
IsOkStatus(&error));
{
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
}
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlQueryRowsAffectedDelete() {
ASSERT_THAT(quirks()->DropTable(&connection, "delete_test", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
"CREATE TABLE \"delete_test\" (foo INT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "INSERT INTO \"delete_test\" (foo) VALUES (1), (2), (3), (4), (5)",
&error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(
&statement, "DELETE FROM \"delete_test\" WHERE foo >= 3", &error),
IsOkStatus(&error));
int64_t rows_affected = 0;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, &rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(rows_affected, ::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
}
void StatementTest::TestSqlQueryRowsAffectedDeleteStream() {
ASSERT_THAT(quirks()->DropTable(&connection, "delete_test", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement,
"CREATE TABLE \"delete_test\" (foo INT)", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(
&statement, "INSERT INTO \"delete_test\" (foo) VALUES (1), (2), (3), (4), (5)",
&error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(
&statement, "DELETE FROM \"delete_test\" WHERE foo >= 3", &error),
IsOkStatus(&error));
adbc_validation::StreamReader reader;
ASSERT_THAT(
AdbcStatementExecuteQuery(&statement, nullptr, &reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_THAT(reader.rows_affected,
::testing::AnyOf(::testing::Eq(3), ::testing::Eq(-1)));
}
void StatementTest::TestTransactions() {
if (!quirks()->supports_transactions() || quirks()->ddl_implicit_commit_txn()) {
GTEST_SKIP();
}
ASSERT_THAT(quirks()->DropTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
if (quirks()->supports_get_option()) {
auto autocommit =
ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error);
ASSERT_THAT(autocommit,
::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_ENABLED)));
}
Handle<struct AdbcConnection> connection2;
ASSERT_THAT(AdbcConnectionNew(&connection2.value, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionInit(&connection2.value, &database, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error),
IsOkStatus(&error));
if (quirks()->supports_get_option()) {
auto autocommit =
ConnectionGetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, &error);
ASSERT_THAT(autocommit,
::testing::Optional(::testing::StrEq(ADBC_OPTION_VALUE_DISABLED)));
}
// Uncommitted change
ASSERT_THAT(quirks()->CreateSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
// Query on first connection should succeed
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value,
"SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
}
if (error.release) error.release(&error);
// Query on second connection should fail
ASSERT_THAT(([&]() -> AdbcStatusCode {
Handle<struct AdbcStatement> statement;
StreamReader reader;
CHECK_OK(AdbcStatementNew(&connection2.value, &statement.value, &error));
CHECK_OK(AdbcStatementSetSqlQuery(
&statement.value, "SELECT * FROM \"bulk_ingest\"", &error));
CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error));
return ADBC_STATUS_OK;
})(),
::testing::Not(IsOkStatus(&error)));
if (error.release) error.release(&error);
// Rollback
ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));
// Query on first connection should fail
ASSERT_THAT(([&]() -> AdbcStatusCode {
Handle<struct AdbcStatement> statement;
StreamReader reader;
CHECK_OK(AdbcStatementNew(&connection, &statement.value, &error));
CHECK_OK(AdbcStatementSetSqlQuery(
&statement.value, "SELECT * FROM \"bulk_ingest\"", &error));
CHECK_OK(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error));
return ADBC_STATUS_OK;
})(),
::testing::Not(IsOkStatus(&error)));
// Rollback
ASSERT_THAT(AdbcConnectionRollback(&connection, &error), IsOkStatus(&error));
// Commit
ASSERT_THAT(quirks()->CreateSampleTable(&connection, "bulk_ingest", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcConnectionCommit(&connection, &error), IsOkStatus(&error));
// Query on second connection should succeed
{
Handle<struct AdbcStatement> statement;
StreamReader reader;
ASSERT_THAT(AdbcStatementNew(&connection2.value, &statement.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement.value,
"SELECT * FROM \"bulk_ingest\"", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementExecuteQuery(&statement.value, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
}
}
void StatementTest::TestSqlSchemaInts() {
if (!quirks()->supports_execute_schema()) {
GTEST_SKIP() << "Not supported";
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
IsOkStatus(&error));
nanoarrow::UniqueSchema schema;
ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
IsOkStatus(&error));
ASSERT_EQ(1, schema->n_children);
ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
::testing::StrEq("i"), // int32
::testing::StrEq("l"), // int64
}));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlSchemaFloats() {
if (!quirks()->supports_execute_schema()) {
GTEST_SKIP() << "Not supported";
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT CAST(1.5 AS FLOAT)", &error),
IsOkStatus(&error));
nanoarrow::UniqueSchema schema;
ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
IsOkStatus(&error));
ASSERT_EQ(1, schema->n_children);
ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
::testing::StrEq("f"), // float32
::testing::StrEq("g"), // float64
}));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlSchemaStrings() {
if (!quirks()->supports_execute_schema()) {
GTEST_SKIP() << "Not supported";
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 'hi'", &error),
IsOkStatus(&error));
nanoarrow::UniqueSchema schema;
ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
IsOkStatus(&error));
ASSERT_EQ(1, schema->n_children);
ASSERT_THAT(schema->children[0]->format, ::testing::AnyOfArray({
::testing::StrEq("u"), // string
::testing::StrEq("U"), // large_string
}));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestSqlSchemaErrors() {
if (!quirks()->supports_execute_schema()) {
GTEST_SKIP() << "Not supported";
}
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
nanoarrow::UniqueSchema schema;
ASSERT_THAT(AdbcStatementExecuteSchema(&statement, schema.get(), &error),
IsStatus(ADBC_STATUS_INVALID_STATE, &error));
ASSERT_THAT(AdbcStatementRelease(&statement, &error), IsOkStatus(&error));
}
void StatementTest::TestConcurrentStatements() {
Handle<struct AdbcStatement> statement1;
Handle<struct AdbcStatement> statement2;
ASSERT_THAT(AdbcStatementNew(&connection, &statement1.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementNew(&connection, &statement2.value, &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement1.value, "SELECT 'SaShiSuSeSo'", &error),
IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement2.value, "SELECT 'SaShiSuSeSo'", &error),
IsOkStatus(&error));
StreamReader reader1;
StreamReader reader2;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement1.value, &reader1.stream.value,
&reader1.rows_affected, &error),
IsOkStatus(&error));
if (quirks()->supports_concurrent_statements()) {
ASSERT_THAT(AdbcStatementExecuteQuery(&statement2.value, &reader2.stream.value,
&reader2.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
} else {
ASSERT_THAT(AdbcStatementExecuteQuery(&statement2.value, &reader2.stream.value,
&reader2.rows_affected, &error),
::testing::Not(IsOkStatus(&error)));
ASSERT_EQ(nullptr, reader2.stream.value.release);
}
// Original stream should still be valid
ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
}
struct ADBC_EXPORT AdbcError100 {
char* message;
int32_t vendor_code;
char sqlstate[5];
void (*release)(struct AdbcError100* error);
};
// Test that an ADBC 1.0.0-sized error still works
void StatementTest::TestErrorCompatibility() {
static_assert(sizeof(AdbcError100) == ADBC_ERROR_1_0_0_SIZE, "Wrong size");
struct AdbcError error;
std::memset(&error, 0, ADBC_ERROR_1_1_0_SIZE);
struct AdbcDriver canary;
error.private_data = &canary;
error.private_driver = &canary;
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(
AdbcStatementSetSqlQuery(&statement, "SELECT * FROM thistabledoesnotexist", &error),
IsOkStatus(&error));
adbc_validation::StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
::testing::Not(IsOkStatus(&error)));
ASSERT_EQ(&canary, error.private_data);
ASSERT_EQ(&canary, error.private_driver);
error.release(&error);
}
void StatementTest::TestResultIndependence() {
// If we have a result reader, and we close the statement (and other
// resources), either the statement should error, or the reader should be
// closeable and should error on other operations
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
IsOkStatus(&error));
StreamReader reader;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader.stream.value,
&reader.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader.GetSchema());
auto status = AdbcStatementRelease(&statement, &error);
if (status != ADBC_STATUS_OK) {
// That's ok, this driver prevents closing the statement while readers are open
return;
}
ASSERT_THAT(AdbcConnectionRelease(&connection, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcDatabaseRelease(&database, &error), IsOkStatus(&error));
// Must not crash (but it's up to the driver whether it errors or succeeds)
std::ignore = reader.MaybeNext();
// Implicitly StreamReader calls release() on destruction, that should not
// crash either
}
void StatementTest::TestResultInvalidation() {
// Start reading from a statement, then overwrite it
ASSERT_THAT(AdbcStatementNew(&connection, &statement, &error), IsOkStatus(&error));
ASSERT_THAT(AdbcStatementSetSqlQuery(&statement, "SELECT 42", &error),
IsOkStatus(&error));
StreamReader reader1;
StreamReader reader2;
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader1.stream.value,
&reader1.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader1.GetSchema());
ASSERT_THAT(AdbcStatementExecuteQuery(&statement, &reader2.stream.value,
&reader2.rows_affected, &error),
IsOkStatus(&error));
ASSERT_NO_FATAL_FAILURE(reader2.GetSchema());
// First reader may fail, or may succeed but give no data
reader1.MaybeNext();
}
} // namespace adbc_validation