c/driver/sqlite/sqlite.c (1,335 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "adbc.h"
#include <errno.h>
#include <inttypes.h>
#include <stdarg.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <nanoarrow/nanoarrow.h>
#include <sqlite3.h>
#include "common/utils.h"
#include "statement_reader.h"
#include "types.h"
static const char kDefaultUri[] = "file:adbc_driver_sqlite?mode=memory&cache=shared";
// The batch size for query results (and for initial type inference)
static const char kStatementOptionBatchRows[] = "adbc.sqlite.query.batch_rows";
static const uint32_t kSupportedInfoCodes[] = {
ADBC_INFO_VENDOR_NAME, ADBC_INFO_VENDOR_VERSION, ADBC_INFO_DRIVER_NAME,
ADBC_INFO_DRIVER_VERSION, ADBC_INFO_DRIVER_ARROW_VERSION,
};
// Private names (to avoid conflicts when using the driver manager)
#define CHECK_DB_INIT(NAME, ERROR) \
if (!NAME->private_data) { \
SetError(ERROR, "[SQLite] %s: database not initialized", __func__); \
return ADBC_STATUS_INVALID_STATE; \
}
#define CHECK_CONN_INIT(NAME, ERROR) \
if (!NAME->private_data) { \
SetError(ERROR, "[SQLite] %s: connection not initialized", __func__); \
return ADBC_STATUS_INVALID_STATE; \
}
#define CHECK_STMT_INIT(NAME, ERROR) \
if (!NAME->private_data) { \
SetError(ERROR, "[SQLite] %s: statement not initialized", __func__); \
return ADBC_STATUS_INVALID_STATE; \
}
AdbcStatusCode SqliteDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
if (database->private_data) {
SetError(error, "[SQLite] AdbcDatabaseNew: database already allocated");
return ADBC_STATUS_INVALID_STATE;
}
database->private_data = malloc(sizeof(struct SqliteDatabase));
memset(database->private_data, 0, sizeof(struct SqliteDatabase));
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
CHECK_DB_INIT(database, error);
struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
if (strcmp(key, "uri") == 0) {
if (db->uri) free(db->uri);
size_t len = strlen(value) + 1;
db->uri = malloc(len);
strncpy(db->uri, value, len);
return ADBC_STATUS_OK;
}
SetError(error, "[SQLite] Unknown database option %s=%s", key,
value ? value : "(NULL)");
return ADBC_STATUS_NOT_IMPLEMENTED;
}
int OpenDatabase(const char* maybe_uri, sqlite3** db, struct AdbcError* error) {
const char* uri = maybe_uri ? maybe_uri : kDefaultUri;
int rc = sqlite3_open_v2(uri, db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_URI,
/*zVfs=*/NULL);
if (rc != SQLITE_OK) {
if (*db) {
SetError(error, "[SQLite] Failed to open %s: %s", uri, sqlite3_errmsg(*db));
} else {
SetError(error, "[SQLite] Failed to open %s: failed to allocate memory", uri);
}
(void)sqlite3_close(*db);
*db = NULL;
return ADBC_STATUS_IO;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode ExecuteQuery(struct SqliteConnection* conn, const char* query,
struct AdbcError* error) {
sqlite3_stmt* stmt = NULL;
int rc = sqlite3_prepare_v2(conn->conn, query, strlen(query), &stmt, /*pzTail=*/NULL);
while (rc != SQLITE_DONE && rc != SQLITE_ERROR) {
rc = sqlite3_step(stmt);
}
rc = sqlite3_finalize(stmt);
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
SetError(error, "[SQLite] Failed to execute query \"%s\": %s", query,
sqlite3_errmsg(conn->conn));
return ADBC_STATUS_INTERNAL;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteDatabaseInit(struct AdbcDatabase* database,
struct AdbcError* error) {
CHECK_DB_INIT(database, error);
struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
if (db->db) {
SetError(error, "[SQLite] AdbcDatabaseInit: database already initialized");
return ADBC_STATUS_INVALID_STATE;
}
return OpenDatabase(db->uri, &db->db, error);
}
AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
CHECK_DB_INIT(database, error);
struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
size_t connection_count = db->connection_count;
if (db->uri) free(db->uri);
if (db->db) {
if (sqlite3_close(db->db) == SQLITE_BUSY) {
SetError(error, "[SQLite] AdbcDatabaseRelease: connection is busy");
return ADBC_STATUS_IO;
}
}
free(database->private_data);
database->private_data = NULL;
if (connection_count > 0) {
// -Wpedantic gives a warning if we use size_t in a printf() context
SetError(error, "[SQLite] AdbcDatabaseRelease: %ld open connections when released",
(long)connection_count); // NOLINT(runtime/int)
return ADBC_STATUS_INVALID_STATE;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionNew(struct AdbcConnection* connection,
struct AdbcError* error) {
if (connection->private_data) {
SetError(error, "[SQLite] AdbcConnectionNew: connection already allocated");
return ADBC_STATUS_INVALID_STATE;
}
connection->private_data = malloc(sizeof(struct SqliteConnection));
memset(connection->private_data, 0, sizeof(struct SqliteConnection));
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
const char* key, const char* value,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
if (strcmp(value, ADBC_OPTION_VALUE_ENABLED) == 0) {
if (conn->active_transaction) {
AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
if (status != ADBC_STATUS_OK) return status;
conn->active_transaction = 0;
} else {
// no-op
}
} else if (strcmp(value, ADBC_OPTION_VALUE_DISABLED) == 0) {
if (conn->active_transaction) {
// no-op
} else {
// begin
AdbcStatusCode status = ExecuteQuery(conn, "BEGIN", error);
if (status != ADBC_STATUS_OK) return status;
conn->active_transaction = 1;
}
} else {
SetError(error, "[SQLite] Invalid connection option value %s=%s", key, value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
return ADBC_STATUS_OK;
}
SetError(error, "[SQLite] Unknown connection option %s=%s", key,
value ? value : "(NULL)");
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode SqliteConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
CHECK_DB_INIT(database, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
struct SqliteDatabase* db = (struct SqliteDatabase*)database->private_data;
if (conn->conn) {
SetError(error, "[SQLite] AdbcConnectionInit: connection already initialized");
return ADBC_STATUS_INVALID_STATE;
}
return OpenDatabase(db->uri, &conn->conn, error);
}
AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (conn->conn) {
int rc = sqlite3_close(conn->conn);
if (rc == SQLITE_BUSY) {
SetError(error, "[SQLite] AdbcConnectionRelease: connection is busy");
return ADBC_STATUS_IO;
}
}
free(connection->private_data);
connection->private_data = NULL;
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionGetInfoImpl(const uint32_t* info_codes,
size_t info_codes_length,
struct ArrowSchema* schema,
struct ArrowArray* array,
struct AdbcError* error) {
RAISE_ADBC(AdbcInitConnectionGetInfoSchema(info_codes, info_codes_length, schema, array,
error));
for (size_t i = 0; i < info_codes_length; i++) {
switch (info_codes[i]) {
case ADBC_INFO_VENDOR_NAME:
RAISE_ADBC(
AdbcConnectionGetInfoAppendString(array, info_codes[i], "SQLite", error));
break;
case ADBC_INFO_VENDOR_VERSION:
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
sqlite3_libversion(), error));
break;
case ADBC_INFO_DRIVER_NAME:
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
"ADBC SQLite Driver", error));
break;
case ADBC_INFO_DRIVER_VERSION:
// TODO(lidavidm): fill in driver version
RAISE_ADBC(
AdbcConnectionGetInfoAppendString(array, info_codes[i], "(unknown)", error));
break;
case ADBC_INFO_DRIVER_ARROW_VERSION:
RAISE_ADBC(AdbcConnectionGetInfoAppendString(array, info_codes[i],
NANOARROW_VERSION, error));
break;
default:
// Ignore
continue;
}
CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
}
struct ArrowError na_error = {0};
CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array, &na_error), &na_error,
error);
return ADBC_STATUS_OK;
} // NOLINT(whitespace/indent)
AdbcStatusCode SqliteConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
// XXX: mistake in adbc.h (should have been const pointer)
const uint32_t* codes = info_codes;
if (!info_codes) {
codes = kSupportedInfoCodes;
info_codes_length = sizeof(kSupportedInfoCodes) / sizeof(kSupportedInfoCodes[0]);
}
struct ArrowSchema schema = {0};
struct ArrowArray array = {0};
AdbcStatusCode status =
SqliteConnectionGetInfoImpl(codes, info_codes_length, &schema, &array, error);
if (status != ADBC_STATUS_OK) {
if (schema.release) schema.release(&schema);
if (array.release) array.release(&array);
return status;
}
return BatchToArrayStream(&array, &schema, out, error);
}
static const char kTableQuery[] =
"SELECT name, type "
"FROM sqlite_master "
"WHERE name LIKE ? AND type <> 'index'"
"ORDER BY name ASC";
static const char kColumnQuery[] =
"SELECT cid, name, type, \"notnull\", dflt_value "
"FROM pragma_table_info(?) "
"WHERE name LIKE ? "
"ORDER BY cid ASC";
static const char kPrimaryKeyQuery[] =
"SELECT name "
"FROM pragma_table_info(?) "
"WHERE pk > 0 "
"ORDER BY pk ASC";
static const char kForeignKeyQuery[] =
"SELECT id, seq, \"table\", \"from\", \"to\" "
"FROM pragma_foreign_key_list(?) "
"ORDER BY id, seq ASC";
AdbcStatusCode SqliteConnectionGetColumnsImpl(
struct SqliteConnection* conn, const char* table_name, const char* column_name,
struct ArrowArray* table_columns_col, sqlite3_stmt* stmt, struct AdbcError* error) {
struct ArrowArray* table_columns_items = table_columns_col->children[0];
struct ArrowArray* column_name_col = table_columns_items->children[0];
struct ArrowArray* ordinal_position_col = table_columns_items->children[1];
struct ArrowArray* remarks_col = table_columns_items->children[2];
struct ArrowArray* xdbc_data_type_col = table_columns_items->children[3];
struct ArrowArray* xdbc_type_name_col = table_columns_items->children[4];
struct ArrowArray* xdbc_column_size_col = table_columns_items->children[5];
struct ArrowArray* xdbc_decimal_digits_col = table_columns_items->children[6];
struct ArrowArray* xdbc_num_prec_radix_col = table_columns_items->children[7];
struct ArrowArray* xdbc_nullable_col = table_columns_items->children[8];
struct ArrowArray* xdbc_column_def_col = table_columns_items->children[9];
struct ArrowArray* xdbc_sql_data_type_col = table_columns_items->children[10];
struct ArrowArray* xdbc_datetime_sub_col = table_columns_items->children[11];
struct ArrowArray* xdbc_char_octet_length_col = table_columns_items->children[12];
struct ArrowArray* xdbc_is_nullable_col = table_columns_items->children[13];
struct ArrowArray* xdbc_scope_catalog_col = table_columns_items->children[14];
struct ArrowArray* xdbc_scope_schema_col = table_columns_items->children[15];
struct ArrowArray* xdbc_scope_table_col = table_columns_items->children[16];
struct ArrowArray* xdbc_is_autoincrement_col = table_columns_items->children[17];
struct ArrowArray* xdbc_is_generatedcolumn_col = table_columns_items->children[18];
int rc = sqlite3_reset(stmt);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
rc = sqlite3_bind_text64(stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
SQLITE_UTF8);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
if (column_name) {
rc = sqlite3_bind_text64(stmt, 2, column_name, strlen(column_name), SQLITE_STATIC,
SQLITE_UTF8);
} else {
rc = sqlite3_bind_text64(stmt, 2, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
}
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) {
const char* col_name = (const char*)sqlite3_column_text(stmt, 1);
struct ArrowStringView str = {.data = col_name,
.size_bytes = sqlite3_column_bytes(stmt, 1)};
CHECK_NA(INTERNAL, ArrowArrayAppendString(column_name_col, str), error);
const int32_t col_cid = sqlite3_column_int(stmt, 0);
CHECK_NA(INTERNAL, ArrowArrayAppendInt(ordinal_position_col, col_cid + 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(remarks_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_data_type_col, 1), error);
const char* col_type = (const char*)sqlite3_column_text(stmt, 2);
if (col_type) {
str.data = col_type;
str.size_bytes = sqlite3_column_bytes(stmt, 2);
CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_type_name_col, str), error);
} else {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_type_name_col, 1), error);
}
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_size_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_decimal_digits_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_num_prec_radix_col, 1), error);
const int32_t col_notnull = sqlite3_column_int(stmt, 3);
if (col_notnull == 0) {
// JDBC columnNullable == 1
CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 1), error);
} else {
// JDBC columnNoNulls == 0
CHECK_NA(INTERNAL, ArrowArrayAppendInt(xdbc_nullable_col, 0), error);
}
const char* col_def = (const char*)sqlite3_column_text(stmt, 4);
if (col_def) {
str.data = col_def;
str.size_bytes = sqlite3_column_bytes(stmt, 4);
CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_column_def_col, str), error);
} else {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_column_def_col, 1), error);
}
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_sql_data_type_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_datetime_sub_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_char_octet_length_col, 1), error);
if (col_notnull == 0) {
str.data = "YES";
str.size_bytes = 3;
} else {
str.data = "NO";
str.size_bytes = 2;
}
CHECK_NA(INTERNAL, ArrowArrayAppendString(xdbc_is_nullable_col, str), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_catalog_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_schema_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_scope_table_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_autoincrement_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(xdbc_is_generatedcolumn_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_items), error);
}
RAISE(INTERNAL, rc == SQLITE_DONE, sqlite3_errmsg(conn->conn), error);
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionGetConstraintsImpl(
struct SqliteConnection* conn, const char* table_name, const char* column_name,
struct ArrowArray* table_constraints_col, sqlite3_stmt* pk_stmt,
sqlite3_stmt* fk_stmt, struct AdbcError* error) {
struct ArrowArray* table_constraints_items = table_constraints_col->children[0];
struct ArrowArray* constraint_name_col = table_constraints_items->children[0];
struct ArrowArray* constraint_type_col = table_constraints_items->children[1];
struct ArrowArray* constraint_column_names_col = table_constraints_items->children[2];
struct ArrowArray* constraint_column_names_items =
constraint_column_names_col->children[0];
struct ArrowArray* constraint_column_usage_col = table_constraints_items->children[3];
struct ArrowArray* constraint_column_usage_items =
constraint_column_usage_col->children[0];
struct ArrowArray* fk_catalog_col = constraint_column_usage_items->children[0];
struct ArrowArray* fk_db_schema_col = constraint_column_usage_items->children[1];
struct ArrowArray* fk_table_col = constraint_column_usage_items->children[2];
struct ArrowArray* fk_column_name_col = constraint_column_usage_items->children[3];
// We can get primary keys and foreign keys, but not unique
// constraints (unless we parse the SQL table definition)
int rc = sqlite3_reset(pk_stmt);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
rc = sqlite3_bind_text64(pk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
SQLITE_UTF8);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
char has_primary_key = 0;
while ((rc = sqlite3_step(pk_stmt)) == SQLITE_ROW) {
if (!has_primary_key) {
has_primary_key = 1;
CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
CHECK_NA(INTERNAL,
ArrowArrayAppendString(constraint_type_col, ArrowCharView("PRIMARY KEY")),
error);
}
CHECK_NA(
INTERNAL,
ArrowArrayAppendString(
constraint_column_names_items,
(struct ArrowStringView){.data = (const char*)sqlite3_column_text(pk_stmt, 0),
.size_bytes = sqlite3_column_bytes(pk_stmt, 0)}),
error);
}
RAISE(INTERNAL, rc == SQLITE_DONE, sqlite3_errmsg(conn->conn), error);
if (has_primary_key) {
CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_column_usage_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
}
rc = sqlite3_reset(fk_stmt);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
rc = sqlite3_bind_text64(fk_stmt, 1, table_name, strlen(table_name), SQLITE_STATIC,
SQLITE_UTF8);
RAISE(INTERNAL, rc == SQLITE_OK, sqlite3_errmsg(conn->conn), error);
int prev_fk_id = -1;
while ((rc = sqlite3_step(fk_stmt)) == SQLITE_ROW) {
const int fk_id = sqlite3_column_int(fk_stmt, 0);
// Foreign key seq is sqlite3_column_int(fk_stmt, 1);
const char* to_table = (const char*)sqlite3_column_text(fk_stmt, 2);
const char* from_col = (const char*)sqlite3_column_text(fk_stmt, 3);
const char* to_col = (const char*)sqlite3_column_text(fk_stmt, 4);
if (fk_id != prev_fk_id) {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(constraint_name_col, 1), error);
CHECK_NA(INTERNAL,
ArrowArrayAppendString(constraint_name_col, ArrowCharView("FOREIGN KEY")),
error);
if (prev_fk_id != -1) {
CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
}
prev_fk_id = fk_id;
CHECK_NA(INTERNAL,
ArrowArrayAppendString(
constraint_column_names_items,
(struct ArrowStringView){
.data = from_col, .size_bytes = sqlite3_column_bytes(pk_stmt, 3)}),
error);
CHECK_NA(INTERNAL, ArrowArrayAppendString(fk_catalog_col, ArrowCharView("main")),
error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(fk_db_schema_col, 1), error);
CHECK_NA(INTERNAL,
ArrowArrayAppendString(
fk_table_col,
(struct ArrowStringView){
.data = to_table, .size_bytes = sqlite3_column_bytes(pk_stmt, 2)}),
error);
CHECK_NA(INTERNAL,
ArrowArrayAppendString(
fk_column_name_col,
(struct ArrowStringView){
.data = to_col, .size_bytes = sqlite3_column_bytes(pk_stmt, 4)}),
error);
}
}
RAISE(INTERNAL, rc == SQLITE_DONE, sqlite3_errmsg(conn->conn), error);
if (prev_fk_id != -1) {
CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_names_col), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(constraint_column_usage_col), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_items), error);
}
return ADBC_STATUS_OK;
} // NOLINT(whitespace/indent)
AdbcStatusCode SqliteConnectionGetTablesInner(
struct SqliteConnection* conn, sqlite3_stmt* tables_stmt, sqlite3_stmt* columns_stmt,
sqlite3_stmt* pk_stmt, sqlite3_stmt* fk_stmt, const char** table_type,
const char* column_name, struct ArrowArray* db_schema_tables_col,
struct AdbcError* error) {
struct ArrowArray* db_schema_tables_items = db_schema_tables_col->children[0];
struct ArrowArray* table_name_col = db_schema_tables_items->children[0];
struct ArrowArray* table_type_col = db_schema_tables_items->children[1];
struct ArrowArray* table_columns_col = db_schema_tables_items->children[2];
struct ArrowArray* table_constraints_col = db_schema_tables_items->children[3];
int rc = SQLITE_OK;
while ((rc = sqlite3_step(tables_stmt)) == SQLITE_ROW) {
const char* cur_table_type = (const char*)sqlite3_column_text(tables_stmt, 1);
if (table_type) {
const char** current = table_type;
char found = 0;
while (*current) {
if (strcmp(*current, cur_table_type) == 0) {
found = 1;
break;
}
current++;
}
if (!found) continue;
}
struct ArrowStringView str = {.data = cur_table_type,
.size_bytes = sqlite3_column_bytes(tables_stmt, 1)};
CHECK_NA(INTERNAL, ArrowArrayAppendString(table_type_col, str), error);
const char* cur_table = (const char*)sqlite3_column_text(tables_stmt, 0);
str.data = cur_table;
str.size_bytes = sqlite3_column_bytes(tables_stmt, 0);
CHECK_NA(INTERNAL, ArrowArrayAppendString(table_name_col, str), error);
if (columns_stmt == NULL) {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_columns_col, 1), error);
CHECK_NA(INTERNAL, ArrowArrayAppendNull(table_constraints_col, 1), error);
} else {
// XXX: n + 1 query pattern. You can join on a pragma so we
// could avoid this in principle but it complicates the
// unpacking code here quite a bit, so ignore for now.
RAISE_ADBC(SqliteConnectionGetColumnsImpl(conn, cur_table, column_name,
table_columns_col, columns_stmt, error));
// Not strictly necessary, but we passed SQLITE_STATIC when
// binding so don't let the reference leak
(void)sqlite3_clear_bindings(columns_stmt);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_columns_col), error);
RAISE_ADBC(SqliteConnectionGetConstraintsImpl(
conn, cur_table, column_name, table_constraints_col, pk_stmt, fk_stmt, error));
(void)sqlite3_clear_bindings(pk_stmt);
(void)sqlite3_clear_bindings(fk_stmt);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(table_constraints_col), error);
}
CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_items), error);
}
if (rc != SQLITE_DONE) {
SetError(error, "[SQLite] Failed to query for tables: %s",
sqlite3_errmsg(conn->conn));
return ADBC_STATUS_INTERNAL;
}
CHECK_NA(INTERNAL, ArrowArrayFinishElement(db_schema_tables_col), error);
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionGetTablesImpl(struct SqliteConnection* conn, int depth,
const char* table_name,
const char** table_type,
const char* column_name,
struct ArrowArray* db_schema_tables_col,
struct AdbcError* error) {
sqlite3_stmt* tables_stmt = NULL;
sqlite3_stmt* columns_stmt = NULL;
sqlite3_stmt* pk_stmt = NULL;
sqlite3_stmt* fk_stmt = NULL;
int rc = SQLITE_OK;
if (rc == SQLITE_OK) {
rc = sqlite3_prepare_v2(conn->conn, kTableQuery, sizeof(kTableQuery), &tables_stmt,
/*pzTail=*/NULL);
}
if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
rc = sqlite3_prepare_v2(conn->conn, kColumnQuery, sizeof(kColumnQuery), &columns_stmt,
/*pzTail=*/NULL);
}
if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
rc = sqlite3_prepare_v2(conn->conn, kPrimaryKeyQuery, sizeof(kPrimaryKeyQuery),
&pk_stmt, /*pzTail=*/NULL);
}
if (rc == SQLITE_OK && depth == ADBC_OBJECT_DEPTH_COLUMNS) {
rc = sqlite3_prepare_v2(conn->conn, kForeignKeyQuery, sizeof(kForeignKeyQuery),
&fk_stmt, /*pzTail=*/NULL);
}
if (rc == SQLITE_OK) {
if (table_name) {
rc = sqlite3_bind_text64(tables_stmt, 1, table_name, strlen(table_name),
SQLITE_STATIC, SQLITE_UTF8);
} else {
rc = sqlite3_bind_text64(tables_stmt, 1, "%", 1, SQLITE_STATIC, SQLITE_UTF8);
}
}
AdbcStatusCode status = ADBC_STATUS_OK;
if (rc == SQLITE_OK) {
status = SqliteConnectionGetTablesInner(conn, tables_stmt, columns_stmt, pk_stmt,
fk_stmt, table_type, column_name,
db_schema_tables_col, error);
} else {
SetError(error, "[SQLite] Failed to query for tables: %s",
sqlite3_errmsg(conn->conn));
status = ADBC_STATUS_INTERNAL;
}
sqlite3_finalize(tables_stmt);
sqlite3_finalize(columns_stmt);
sqlite3_finalize(pk_stmt);
sqlite3_finalize(fk_stmt);
return status;
}
AdbcStatusCode SqliteConnectionGetObjectsImpl(
struct SqliteConnection* conn, int depth, const char* catalog, const char* db_schema,
const char* table_name, const char** table_type, const char* column_name,
struct ArrowSchema* schema, struct ArrowArray* array, struct AdbcError* error) {
RAISE_ADBC(AdbcInitConnectionObjectsSchema(schema, error));
struct ArrowError na_error = {0};
CHECK_NA_DETAIL(INTERNAL, ArrowArrayInitFromSchema(array, schema, &na_error), &na_error,
error);
CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
struct ArrowArray* catalog_name_col = array->children[0];
struct ArrowArray* catalog_db_schemas_col = array->children[1];
struct ArrowArray* catalog_db_schemas_items = catalog_db_schemas_col->children[0];
struct ArrowArray* db_schema_name_col = catalog_db_schemas_items->children[0];
struct ArrowArray* db_schema_tables_col = catalog_db_schemas_items->children[1];
// TODO: support proper filters
if (!catalog || strcmp(catalog, "main") == 0) {
// Default the primary catalog to "main"
// https://www.sqlite.org/cli.html
// > The ".databases" command shows a list of all databases open
// > in the current connection. There will always be at least
// > 2. The first one is "main", the original database opened.
CHECK_NA(INTERNAL, ArrowArrayAppendString(catalog_name_col, ArrowCharView("main")),
error);
if (depth == ADBC_OBJECT_DEPTH_CATALOGS) {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(catalog_db_schemas_col, 1), error);
} else if (!db_schema || db_schema == NULL) {
// For our purposes, we'll consider SQLite to always have a
// single, unnamed schema within each catalog.
CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_name_col, 1), error);
if (depth == ADBC_OBJECT_DEPTH_DB_SCHEMAS) {
CHECK_NA(INTERNAL, ArrowArrayAppendNull(db_schema_tables_col, 1), error);
} else {
RAISE_ADBC(SqliteConnectionGetTablesImpl(conn, depth, table_name, table_type,
column_name, db_schema_tables_col,
error));
}
CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_items), error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
} else {
CHECK_NA(INTERNAL, ArrowArrayFinishElement(catalog_db_schemas_col), error);
}
CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
// TODO: implement "temp", other attached databases as catalogs
}
CHECK_NA_DETAIL(INTERNAL, ArrowArrayFinishBuildingDefault(array, &na_error), &na_error,
error);
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionGetObjects(struct AdbcConnection* connection, int depth,
const char* catalog, const char* db_schema,
const char* table_name, const char** table_type,
const char* column_name,
struct ArrowArrayStream* out,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
struct ArrowSchema schema = {0};
struct ArrowArray array = {0};
AdbcStatusCode status =
SqliteConnectionGetObjectsImpl(conn, depth, catalog, db_schema, table_name,
table_type, column_name, &schema, &array, error);
if (status != ADBC_STATUS_OK) {
if (schema.release) schema.release(&schema);
if (array.release) array.release(&array);
return status;
}
return BatchToArrayStream(&array, &schema, out, error);
}
AdbcStatusCode SqliteConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
struct ArrowSchema* schema,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (catalog != NULL && strlen(catalog) > 0) {
// TODO: map 'catalog' to SQLite attached database
memset(schema, 0, sizeof(*schema));
return ADBC_STATUS_OK;
} else if (db_schema != NULL && strlen(db_schema) > 0) {
// SQLite does not support schemas
memset(schema, 0, sizeof(*schema));
return ADBC_STATUS_OK;
} else if (table_name == NULL) {
SetError(error, "[SQLite] AdbcConnectionGetTableSchema: must provide table_name");
return ADBC_STATUS_INVALID_ARGUMENT;
}
struct StringBuilder query = {0};
if (StringBuilderInit(&query, /*initial_size=*/64) != 0) {
SetError(error, "[SQLite] Could not initiate StringBuilder");
return ADBC_STATUS_INTERNAL;
}
if (StringBuilderAppend(&query, "%s%s", "SELECT * FROM ", table_name) != 0) {
StringBuilderReset(&query);
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
return ADBC_STATUS_INTERNAL;
}
sqlite3_stmt* stmt = NULL;
int rc =
sqlite3_prepare_v2(conn->conn, query.buffer, query.size, &stmt, /*pzTail=*/NULL);
StringBuilderReset(&query);
if (rc != SQLITE_OK) {
SetError(error, "[SQLite] Failed to prepare query: %s", sqlite3_errmsg(conn->conn));
return ADBC_STATUS_INTERNAL;
}
struct ArrowArrayStream stream = {0};
AdbcStatusCode status = AdbcSqliteExportReader(conn->conn, stmt, /*binder=*/NULL,
/*batch_size=*/64, &stream, error);
if (status == ADBC_STATUS_OK) {
int code = stream.get_schema(&stream, schema);
if (code != 0) {
SetError(error, "[SQLite] Failed to get schema: (%d) %s", code, strerror(code));
status = ADBC_STATUS_IO;
}
}
if (stream.release) {
stream.release(&stream);
}
(void)sqlite3_finalize(stmt);
return status;
}
AdbcStatusCode SqliteConnectionGetTableTypesImpl(struct ArrowSchema* schema,
struct ArrowArray* array,
struct AdbcError* error) {
ArrowSchemaInit(schema);
CHECK_NA(INTERNAL, ArrowSchemaSetType(schema, NANOARROW_TYPE_STRUCT), error);
CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, /*num_columns=*/1), error);
ArrowSchemaInit(schema->children[0]);
CHECK_NA(INTERNAL, ArrowSchemaSetType(schema->children[0], NANOARROW_TYPE_STRING),
error);
CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[0], "table_type"), error);
schema->children[0]->flags &= ~ARROW_FLAG_NULLABLE;
CHECK_NA(INTERNAL, ArrowArrayInitFromSchema(array, schema, NULL), error);
CHECK_NA(INTERNAL, ArrowArrayStartAppending(array), error);
CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("table")),
error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
CHECK_NA(INTERNAL, ArrowArrayAppendString(array->children[0], ArrowCharView("view")),
error);
CHECK_NA(INTERNAL, ArrowArrayFinishElement(array), error);
CHECK_NA(INTERNAL, ArrowArrayFinishBuildingDefault(array, NULL), error);
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteConnectionGetTableTypes(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct ArrowSchema schema = {0};
struct ArrowArray array = {0};
AdbcStatusCode status = SqliteConnectionGetTableTypesImpl(&schema, &array, error);
if (status != ADBC_STATUS_OK) {
if (schema.release) schema.release(&schema);
if (array.release) array.release(&array);
return status;
}
return BatchToArrayStream(&array, &schema, out, error);
}
AdbcStatusCode SqliteConnectionReadPartition(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (!conn->active_transaction) {
SetError(error, "[SQLite] No active transaction, cannot commit");
return ADBC_STATUS_INVALID_STATE;
}
AdbcStatusCode status = ExecuteQuery(conn, "COMMIT", error);
if (status != ADBC_STATUS_OK) return status;
return ExecuteQuery(conn, "BEGIN", error);
}
AdbcStatusCode SqliteConnectionRollback(struct AdbcConnection* connection,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (!conn->active_transaction) {
SetError(error, "[SQLite] No active transaction, cannot rollback");
return ADBC_STATUS_INVALID_STATE;
}
AdbcStatusCode status = ExecuteQuery(conn, "ROLLBACK", error);
if (status != ADBC_STATUS_OK) return status;
return ExecuteQuery(conn, "BEGIN", error);
}
AdbcStatusCode SqliteStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
CHECK_CONN_INIT(connection, error);
struct SqliteConnection* conn = (struct SqliteConnection*)connection->private_data;
if (statement->private_data) {
SetError(error, "[SQLite] AdbcStatementNew: statement already allocated");
return ADBC_STATUS_INVALID_STATE;
} else if (!conn->conn) {
SetError(error, "[SQLite] AdbcStatementNew: connection is not initialized");
return ADBC_STATUS_INVALID_STATE;
}
statement->private_data = malloc(sizeof(struct SqliteStatement));
memset(statement->private_data, 0, sizeof(struct SqliteStatement));
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
stmt->conn = conn->conn;
// Default options
stmt->batch_size = 1024;
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteStatementRelease(struct AdbcStatement* statement,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
int rc = SQLITE_OK;
if (stmt->stmt) {
rc = sqlite3_finalize(stmt->stmt);
}
if (stmt->query) free(stmt->query);
AdbcSqliteBinderRelease(&stmt->binder);
if (stmt->target_table) free(stmt->target_table);
if (rc != SQLITE_OK) {
SetError(error,
"[SQLite] AdbcStatementRelease: statement failed to finalize: (%d) %s", rc,
sqlite3_errmsg(stmt->conn));
}
free(statement->private_data);
statement->private_data = NULL;
return rc == SQLITE_OK ? ADBC_STATUS_OK : ADBC_STATUS_IO;
}
AdbcStatusCode SqliteStatementPrepare(struct AdbcStatement* statement,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
if (!stmt->query) {
SetError(error, "[SQLite] Must SetSqlQuery before ExecuteQuery or Prepare");
return ADBC_STATUS_INVALID_STATE;
}
if (stmt->prepared == 0) {
if (stmt->stmt) {
int rc = sqlite3_finalize(stmt->stmt);
stmt->stmt = NULL;
if (rc != SQLITE_OK) {
SetError(error, "[SQLite] Failed to finalize previous statement: (%d) %s", rc,
sqlite3_errmsg(stmt->conn));
return ADBC_STATUS_IO;
}
}
int rc =
sqlite3_prepare_v2(stmt->conn, stmt->query, (int)stmt->query_len, &stmt->stmt,
/*pzTail=*/NULL);
if (rc != SQLITE_OK) {
SetError(error, "[SQLite] Failed to prepare query: %s\nQuery:%s",
sqlite3_errmsg(stmt->conn), stmt->query);
(void)sqlite3_finalize(stmt->stmt);
stmt->stmt = NULL;
return ADBC_STATUS_INVALID_ARGUMENT;
}
stmt->prepared = 1;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteStatementInitIngest(struct SqliteStatement* stmt,
sqlite3_stmt** insert_statement,
struct AdbcError* error) {
AdbcStatusCode code = ADBC_STATUS_OK;
// Create statements for CREATE TABLE / INSERT
sqlite3_str* create_query = sqlite3_str_new(NULL);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
return ADBC_STATUS_INTERNAL;
}
struct StringBuilder insert_query = {0};
if (StringBuilderInit(&insert_query, /*initial_size=*/256) != 0) {
SetError(error, "[SQLite] Could not initiate StringBuilder");
sqlite3_free(sqlite3_str_finish(create_query));
return ADBC_STATUS_INTERNAL;
}
sqlite3_str_appendf(create_query, "%s%Q%s", "CREATE TABLE ", stmt->target_table, " (");
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
if (StringBuilderAppend(&insert_query, "%s%s%s", "INSERT INTO ", stmt->target_table,
" VALUES (") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
struct ArrowError arrow_error = {0};
struct ArrowSchemaView view = {0};
for (int i = 0; i < stmt->binder.schema.n_children; i++) {
if (i > 0) {
sqlite3_str_appendf(create_query, "%s", ", ");
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
sqlite3_str_appendf(create_query, "%Q", stmt->binder.schema.children[i]->name);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
int status =
ArrowSchemaViewInit(&view, stmt->binder.schema.children[i], &arrow_error);
if (status != 0) {
SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
strerror(status), status, arrow_error.message);
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
switch (view.type) {
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_UINT16:
case NANOARROW_TYPE_UINT32:
case NANOARROW_TYPE_UINT64:
case NANOARROW_TYPE_INT8:
case NANOARROW_TYPE_INT16:
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_INT64:
sqlite3_str_appendf(create_query, " INTEGER");
break;
case NANOARROW_TYPE_FLOAT:
case NANOARROW_TYPE_DOUBLE:
sqlite3_str_appendf(create_query, " REAL");
break;
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_DATE32:
sqlite3_str_appendf(create_query, " TEXT");
break;
case NANOARROW_TYPE_BINARY:
sqlite3_str_appendf(create_query, " BLOB");
break;
default:
break;
}
if (i > 0) {
if (StringBuilderAppend(&insert_query, "%s", ", ") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
if (StringBuilderAppend(&insert_query, "%s", "?") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
sqlite3_str_appendchar(create_query, 1, ')');
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
if (StringBuilderAppend(&insert_query, "%s", ")") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
sqlite3_stmt* create = NULL;
if (!stmt->append) {
// Create table
int rc =
sqlite3_prepare_v2(stmt->conn, sqlite3_str_value(create_query),
sqlite3_str_length(create_query), &create, /*pzTail=*/NULL);
if (rc == SQLITE_OK) {
rc = sqlite3_step(create);
}
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
SetError(error, "[SQLite] Failed to create table: %s (executed '%.*s')",
sqlite3_errmsg(stmt->conn), sqlite3_str_length(create_query),
sqlite3_str_value(create_query));
code = ADBC_STATUS_INTERNAL;
}
}
if (code == ADBC_STATUS_OK) {
int rc = sqlite3_prepare_v2(stmt->conn, insert_query.buffer, (int)insert_query.size,
insert_statement, /*pzTail=*/NULL);
if (rc != SQLITE_OK) {
SetError(error, "[SQLite] Failed to prepare statement: %s (executed '%s')",
sqlite3_errmsg(stmt->conn), insert_query.buffer);
code = ADBC_STATUS_INTERNAL;
}
}
sqlite3_finalize(create);
cleanup:
sqlite3_free(sqlite3_str_finish(create_query));
StringBuilderReset(&insert_query);
return code;
}
AdbcStatusCode SqliteStatementExecuteIngest(struct SqliteStatement* stmt,
int64_t* rows_affected,
struct AdbcError* error) {
if (!stmt->binder.schema.release) {
SetError(error, "[SQLite] Must Bind() before bulk ingestion");
return ADBC_STATUS_INVALID_STATE;
}
sqlite3_stmt* insert = NULL;
AdbcStatusCode status = SqliteStatementInitIngest(stmt, &insert, error);
int64_t row_count = 0;
int is_autocommit = sqlite3_get_autocommit(stmt->conn);
if (status == ADBC_STATUS_OK) {
if (is_autocommit) sqlite3_exec(stmt->conn, "BEGIN TRANSACTION", 0, 0, 0);
while (1) {
char finished = 0;
status =
AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, insert, &finished, error);
if (status != ADBC_STATUS_OK || finished) break;
int rc = 0;
do {
rc = sqlite3_step(insert);
} while (rc == SQLITE_ROW);
if (rc != SQLITE_DONE) {
SetError(error, "[SQLite] Failed to execute statement: %s",
sqlite3_errmsg(stmt->conn));
status = ADBC_STATUS_INTERNAL;
break;
}
row_count++;
}
if (is_autocommit) sqlite3_exec(stmt->conn, "COMMIT", 0, 0, 0);
}
if (rows_affected) *rows_affected = row_count;
if (insert) sqlite3_finalize(insert);
AdbcSqliteBinderRelease(&stmt->binder);
return status;
}
AdbcStatusCode SqliteStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
if (stmt->target_table) {
return SqliteStatementExecuteIngest(stmt, rows_affected, error);
}
AdbcStatusCode status = SqliteStatementPrepare(statement, error);
if (status != ADBC_STATUS_OK) return status;
if (stmt->binder.schema.release) {
int64_t expected = sqlite3_bind_parameter_count(stmt->stmt);
int64_t actual = stmt->binder.schema.n_children;
if (actual != expected) {
SetError(error,
"[SQLite] Parameter count mismatch: expected %" PRId64
" but found %" PRId64,
expected, actual);
return ADBC_STATUS_INVALID_STATE;
}
}
if (!out) {
// Update
sqlite3_mutex_enter(sqlite3_db_mutex(stmt->conn));
AdbcStatusCode status = ADBC_STATUS_OK;
int64_t rows = 0;
while (1) {
if (stmt->binder.schema.release) {
char finished = 0;
status = AdbcSqliteBinderBindNext(&stmt->binder, stmt->conn, stmt->stmt,
&finished, error);
if (status != ADBC_STATUS_OK || finished) {
break;
}
}
while (sqlite3_step(stmt->stmt) == SQLITE_ROW) {
rows++;
}
if (!stmt->binder.schema.release) break;
}
if (sqlite3_reset(stmt->stmt) != SQLITE_OK) {
status = ADBC_STATUS_IO;
const char* msg = sqlite3_errmsg(stmt->conn);
SetError(error, "[SQLite] Failed to execute query: %s",
(msg == NULL) ? "(unknown error)" : msg);
}
sqlite3_mutex_leave(sqlite3_db_mutex(stmt->conn));
AdbcSqliteBinderRelease(&stmt->binder);
if (rows_affected) *rows_affected = rows;
return status;
}
// Query
if (rows_affected) *rows_affected = -1;
struct AdbcSqliteBinder* binder = stmt->binder.schema.release ? &stmt->binder : NULL;
return AdbcSqliteExportReader(stmt->conn, stmt->stmt, binder, stmt->batch_size, out,
error);
}
AdbcStatusCode SqliteStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
if (stmt->query) {
free(stmt->query);
stmt->query = NULL;
}
if (stmt->target_table) {
free(stmt->target_table);
stmt->target_table = NULL;
}
size_t len = strlen(query) + 1;
stmt->query = malloc(len);
stmt->query_len = len;
stmt->prepared = 0;
strncpy(stmt->query, query, len);
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteStatementSetSubstraitPlan(struct AdbcStatement* statement,
const uint8_t* plan, size_t length,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
SetError(error, "[SQLite] Substrait is not supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
return AdbcSqliteBinderSetArray(&stmt->binder, values, schema, error);
}
AdbcStatusCode SqliteStatementBindStream(struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
return AdbcSqliteBinderSetArrayStream(&stmt->binder, stream, error);
}
AdbcStatusCode SqliteStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
AdbcStatusCode status = SqliteStatementPrepare(statement, error);
if (status != ADBC_STATUS_OK) return status;
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
int num_params = sqlite3_bind_parameter_count(stmt->stmt);
if (num_params < 0) {
// Should not happen
SetError(error, "[SQLite] SQLite returned negative parameter count");
return ADBC_STATUS_INTERNAL;
}
ArrowSchemaInit(schema);
CHECK_NA(INTERNAL, ArrowSchemaSetType(schema, NANOARROW_TYPE_STRUCT), error);
CHECK_NA(INTERNAL, ArrowSchemaAllocateChildren(schema, num_params), error);
char buffer[11];
for (int i = 0; i < num_params; i++) {
const char* name = sqlite3_bind_parameter_name(stmt->stmt, i + 1);
if (name == NULL) {
snprintf(buffer, sizeof(buffer), "%d", i);
name = buffer;
}
ArrowSchemaInit(schema->children[i]);
CHECK_NA(INTERNAL, ArrowSchemaSetType(schema->children[i], NANOARROW_TYPE_NA), error);
CHECK_NA(INTERNAL, ArrowSchemaSetName(schema->children[i], name), error);
}
return ADBC_STATUS_OK;
}
AdbcStatusCode SqliteStatementSetOption(struct AdbcStatement* statement, const char* key,
const char* value, struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
struct SqliteStatement* stmt = (struct SqliteStatement*)statement->private_data;
if (strcmp(key, ADBC_INGEST_OPTION_TARGET_TABLE) == 0) {
if (stmt->query) {
free(stmt->query);
stmt->query = NULL;
}
if (stmt->target_table) {
free(stmt->target_table);
stmt->target_table = NULL;
}
size_t len = strlen(value) + 1;
stmt->target_table = (char*)malloc(len);
strncpy(stmt->target_table, value, len);
return ADBC_STATUS_OK;
} else if (strcmp(key, ADBC_INGEST_OPTION_MODE) == 0) {
if (strcmp(value, ADBC_INGEST_OPTION_MODE_APPEND) == 0) {
stmt->append = 1;
} else if (strcmp(value, ADBC_INGEST_OPTION_MODE_CREATE) == 0) {
stmt->append = 0;
} else {
SetError(error, "[SQLite] Invalid statement option value %s=%s", key, value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
return ADBC_STATUS_OK;
} else if (strcmp(key, kStatementOptionBatchRows) == 0) {
char* end = NULL;
long batch_size = strtol(value, &end, /*base=*/10); // NOLINT(runtime/int)
if (errno != 0) {
SetError(error, "[SQLite] Invalid statement option value %s=%s (out of range)", key,
value);
return ADBC_STATUS_INVALID_ARGUMENT;
} else if (batch_size <= 0) {
SetError(error,
"[SQLite] Invalid statement option value %s=%s (value is non-positive or "
"invalid)",
key, value);
return ADBC_STATUS_INVALID_ARGUMENT;
} else if (batch_size > (long)INT_MAX) { // NOLINT(runtime/int)
SetError(
error,
"[SQLite] Invalid statement option value %s=%s (value is out of range of int)",
key, value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
stmt->batch_size = (int)batch_size;
return ADBC_STATUS_OK;
}
SetError(error, "[SQLite] Unknown statement option %s=%s", key,
value ? value : "(NULL)");
return ADBC_STATUS_NOT_IMPLEMENTED;
}
AdbcStatusCode SqliteStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
int64_t* rows_affected,
struct AdbcError* error) {
CHECK_STMT_INIT(statement, error);
SetError(error, "[SQLite] Partitioned result sets are not supported");
return ADBC_STATUS_NOT_IMPLEMENTED;
} // NOLINT(whitespace/indent)
AdbcStatusCode SqliteDriverInit(int version, void* raw_driver, struct AdbcError* error) {
if (version != ADBC_VERSION_1_0_0) {
SetError(error, "[SQLite] Only version %d supported, got %d", ADBC_VERSION_1_0_0,
version);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
struct AdbcDriver* driver = (struct AdbcDriver*)raw_driver;
memset(driver, 0, sizeof(*driver));
driver->DatabaseInit = SqliteDatabaseInit;
driver->DatabaseNew = SqliteDatabaseNew;
driver->DatabaseRelease = SqliteDatabaseRelease;
driver->DatabaseSetOption = SqliteDatabaseSetOption;
driver->ConnectionCommit = SqliteConnectionCommit;
driver->ConnectionGetInfo = SqliteConnectionGetInfo;
driver->ConnectionGetObjects = SqliteConnectionGetObjects;
driver->ConnectionGetTableSchema = SqliteConnectionGetTableSchema;
driver->ConnectionGetTableTypes = SqliteConnectionGetTableTypes;
driver->ConnectionInit = SqliteConnectionInit;
driver->ConnectionNew = SqliteConnectionNew;
driver->ConnectionReadPartition = SqliteConnectionReadPartition;
driver->ConnectionRelease = SqliteConnectionRelease;
driver->ConnectionRollback = SqliteConnectionRollback;
driver->ConnectionSetOption = SqliteConnectionSetOption;
driver->StatementBind = SqliteStatementBind;
driver->StatementBindStream = SqliteStatementBindStream;
driver->StatementExecuteQuery = SqliteStatementExecuteQuery;
driver->StatementGetParameterSchema = SqliteStatementGetParameterSchema;
driver->StatementNew = SqliteStatementNew;
driver->StatementPrepare = SqliteStatementPrepare;
driver->StatementRelease = SqliteStatementRelease;
driver->StatementSetOption = SqliteStatementSetOption;
driver->StatementSetSqlQuery = SqliteStatementSetSqlQuery;
return ADBC_STATUS_OK;
}
// Public names
AdbcStatusCode AdbcDatabaseNew(struct AdbcDatabase* database, struct AdbcError* error) {
return SqliteDatabaseNew(database, error);
}
AdbcStatusCode AdbcDatabaseSetOption(struct AdbcDatabase* database, const char* key,
const char* value, struct AdbcError* error) {
return SqliteDatabaseSetOption(database, key, value, error);
}
AdbcStatusCode AdbcDatabaseInit(struct AdbcDatabase* database, struct AdbcError* error) {
return SqliteDatabaseInit(database, error);
}
AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
struct AdbcError* error) {
return SqliteDatabaseRelease(database, error);
}
AdbcStatusCode AdbcConnectionNew(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionNew(connection, error);
}
AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
const char* value, struct AdbcError* error) {
return SqliteConnectionSetOption(connection, key, value, error);
}
AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
struct AdbcDatabase* database,
struct AdbcError* error) {
return SqliteConnectionInit(connection, database, error);
}
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionRelease(connection, error);
}
AdbcStatusCode AdbcConnectionGetInfo(struct AdbcConnection* connection,
uint32_t* info_codes, size_t info_codes_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return SqliteConnectionGetInfo(connection, info_codes, info_codes_length, out, error);
}
AdbcStatusCode AdbcConnectionGetObjects(struct AdbcConnection* connection, int depth,
const char* catalog, const char* db_schema,
const char* table_name, const char** table_type,
const char* column_name,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return SqliteConnectionGetObjects(connection, depth, catalog, db_schema, table_name,
table_type, column_name, out, error);
}
AdbcStatusCode AdbcConnectionGetTableSchema(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name,
struct ArrowSchema* schema,
struct AdbcError* error) {
return SqliteConnectionGetTableSchema(connection, catalog, db_schema, table_name,
schema, error);
}
AdbcStatusCode AdbcConnectionGetTableTypes(struct AdbcConnection* connection,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return SqliteConnectionGetTableTypes(connection, out, error);
}
AdbcStatusCode AdbcConnectionReadPartition(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct ArrowArrayStream* out,
struct AdbcError* error) {
return SqliteConnectionReadPartition(connection, serialized_partition,
serialized_length, out, error);
}
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionCommit(connection, error);
}
AdbcStatusCode AdbcConnectionRollback(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionRollback(connection, error);
}
AdbcStatusCode AdbcStatementNew(struct AdbcConnection* connection,
struct AdbcStatement* statement,
struct AdbcError* error) {
return SqliteStatementNew(connection, statement, error);
}
AdbcStatusCode AdbcStatementRelease(struct AdbcStatement* statement,
struct AdbcError* error) {
return SqliteStatementRelease(statement, error);
}
AdbcStatusCode AdbcStatementExecuteQuery(struct AdbcStatement* statement,
struct ArrowArrayStream* out,
int64_t* rows_affected,
struct AdbcError* error) {
return SqliteStatementExecuteQuery(statement, out, rows_affected, error);
}
AdbcStatusCode AdbcStatementPrepare(struct AdbcStatement* statement,
struct AdbcError* error) {
return SqliteStatementPrepare(statement, error);
}
AdbcStatusCode AdbcStatementSetSqlQuery(struct AdbcStatement* statement,
const char* query, struct AdbcError* error) {
return SqliteStatementSetSqlQuery(statement, query, error);
}
AdbcStatusCode AdbcStatementSetSubstraitPlan(struct AdbcStatement* statement,
const uint8_t* plan, size_t length,
struct AdbcError* error) {
return SqliteStatementSetSubstraitPlan(statement, plan, length, error);
}
AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
return SqliteStatementBind(statement, values, schema, error);
}
AdbcStatusCode AdbcStatementBindStream(struct AdbcStatement* statement,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
return SqliteStatementBindStream(statement, stream, error);
}
AdbcStatusCode AdbcStatementGetParameterSchema(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcError* error) {
return SqliteStatementGetParameterSchema(statement, schema, error);
}
AdbcStatusCode AdbcStatementSetOption(struct AdbcStatement* statement, const char* key,
const char* value, struct AdbcError* error) {
return SqliteStatementSetOption(statement, key, value, error);
}
AdbcStatusCode AdbcStatementExecutePartitions(struct AdbcStatement* statement,
struct ArrowSchema* schema,
struct AdbcPartitions* partitions,
int64_t* rows_affected,
struct AdbcError* error) {
return SqliteStatementExecutePartitions(statement, schema, partitions, rows_affected,
error);
} // NOLINT(whitespace/indent)
// due to https://github.com/cpplint/cpplint/pull/189
ADBC_EXPORT
AdbcStatusCode AdbcDriverInit(int version, void* driver, struct AdbcError* error) {
return SqliteDriverInit(version, driver, error);
}