c/driver/sqlite/statement_reader.c (1,033 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#if !defined(_WIN32)
#define _POSIX_C_SOURCE 200112L
#endif
#include "statement_reader.h"
#include <assert.h>
#include <inttypes.h>
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include <time.h>
#include <arrow-adbc/adbc.h>
#include <nanoarrow/nanoarrow.h>
#include <sqlite3.h>
#include "driver/common/utils.h"
AdbcStatusCode AdbcSqliteBinderSet(struct AdbcSqliteBinder* binder,
struct AdbcError* error) {
int status = binder->params.get_schema(&binder->params, &binder->schema);
if (status != 0) {
const char* message = binder->params.get_last_error(&binder->params);
if (!message) message = "(unknown error)";
SetError(error, "Failed to get parameter schema: (%d) %s: %s", status,
strerror(status), message);
return ADBC_STATUS_INVALID_ARGUMENT;
}
struct ArrowError arrow_error = {0};
status = ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
if (status != 0) {
SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
strerror(status), arrow_error.message);
return ADBC_STATUS_INVALID_ARGUMENT;
}
if (binder->batch.storage_type != NANOARROW_TYPE_STRUCT) {
SetError(error, "Bind parameters do not have root type STRUCT");
return ADBC_STATUS_INVALID_ARGUMENT;
}
binder->types =
(enum ArrowType*)malloc(binder->schema.n_children * sizeof(enum ArrowType));
struct ArrowSchemaView view = {0};
for (int i = 0; i < binder->schema.n_children; i++) {
status = ArrowSchemaViewInit(&view, binder->schema.children[i], &arrow_error);
if (status != NANOARROW_OK) {
SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
strerror(status), status, arrow_error.message);
return ADBC_STATUS_INVALID_ARGUMENT;
}
if (view.type == NANOARROW_TYPE_UNINITIALIZED) {
SetError(error, "Column %d has UNINITIALIZED type", i);
return ADBC_STATUS_INTERNAL;
}
if (view.type == NANOARROW_TYPE_DICTIONARY) {
struct ArrowSchemaView value_view = {0};
status = ArrowSchemaViewInit(&value_view, binder->schema.children[i]->dictionary,
&arrow_error);
if (status != NANOARROW_OK) {
SetError(error, "Failed to parse schema for column %d->dictionary: %s (%d): %s",
i, strerror(status), status, arrow_error.message);
return ADBC_STATUS_INVALID_ARGUMENT;
}
// We only support string/binary dictionary-encoded values
switch (value_view.type) {
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_STRING_VIEW:
case NANOARROW_TYPE_BINARY:
case NANOARROW_TYPE_LARGE_BINARY:
case NANOARROW_TYPE_FIXED_SIZE_BINARY:
case NANOARROW_TYPE_BINARY_VIEW:
break;
default:
SetError(error, "Column %d dictionary has unsupported type %s", i,
ArrowTypeString(value_view.type));
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
binder->types[i] = view.type;
}
return ADBC_STATUS_OK;
}
AdbcStatusCode AdbcSqliteBinderSetArrayStream(struct AdbcSqliteBinder* binder,
struct ArrowArrayStream* values,
struct AdbcError* error) {
AdbcSqliteBinderRelease(binder);
binder->params = *values;
memset(values, 0, sizeof(*values));
return AdbcSqliteBinderSet(binder, error);
}
#define SECONDS_PER_DAY 86400
/*
Allocates to buf on success. Caller is responsible for freeing.
On failure sets error and contents of buf are undefined.
*/
static AdbcStatusCode ArrowDate32ToIsoString(int32_t value, char** buf,
struct AdbcError* error) {
int strlen = 10;
#if SIZEOF_TIME_T < 8
if ((value > INT32_MAX / SECONDS_PER_DAY) || (value < INT32_MIN / SECONDS_PER_DAY)) {
SetError(error, "Date %" PRId32 " exceeds platform time_t bounds", value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
time_t time = (time_t)(value * SECONDS_PER_DAY);
#else
time_t time = value * SECONDS_PER_DAY;
#endif
struct tm broken_down_time;
#if defined(_WIN32)
if (gmtime_s(&broken_down_time, &time) != 0) {
SetError(error, "Could not convert date %" PRId32 " to broken down time", value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
#else
if (gmtime_r(&time, &broken_down_time) != &broken_down_time) {
SetError(error, "Could not convert date %" PRId32 " to broken down time", value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
#endif
char* tsstr = malloc(strlen + 1);
if (tsstr == NULL) {
return ADBC_STATUS_IO;
}
if (strftime(tsstr, strlen + 1, "%Y-%m-%d", &broken_down_time) == 0) {
SetError(error, "Call to strftime for date %" PRId32 " with failed", value);
free(tsstr);
return ADBC_STATUS_INVALID_ARGUMENT;
}
*buf = tsstr;
return ADBC_STATUS_OK;
}
/*
Allocates to buf on success. Caller is responsible for freeing.
On failure sets error and contents of buf are undefined.
*/
static AdbcStatusCode ArrowTimestampToIsoString(int64_t value, enum ArrowTimeUnit unit,
char** buf, struct AdbcError* error) {
int scale = 1;
int strlen = 20;
int rem = 0;
switch (unit) {
case NANOARROW_TIME_UNIT_SECOND:
break;
case NANOARROW_TIME_UNIT_MILLI:
scale = 1000;
strlen = 24;
break;
case NANOARROW_TIME_UNIT_MICRO:
scale = 1000000;
strlen = 27;
break;
case NANOARROW_TIME_UNIT_NANO:
scale = 1000000000;
strlen = 30;
break;
}
rem = value % scale;
if (rem < 0) {
value -= scale;
rem = scale + rem;
}
const int64_t seconds = value / scale;
#if SIZEOF_TIME_T < 8
if ((seconds > INT32_MAX) || (seconds < INT32_MIN)) {
SetError(error, "Timestamp %" PRId64 " with unit %d exceeds platform time_t bounds",
value, unit);
return ADBC_STATUS_INVALID_ARGUMENT;
}
const time_t time = (time_t)seconds;
#else
const time_t time = seconds;
#endif
struct tm broken_down_time;
#if defined(_WIN32)
if (gmtime_s(&broken_down_time, &time) != 0) {
SetError(error,
"Could not convert timestamp %" PRId64 " with unit %d to broken down time",
value, unit);
return ADBC_STATUS_INVALID_ARGUMENT;
}
#else
if (gmtime_r(&time, &broken_down_time) != &broken_down_time) {
SetError(error,
"Could not convert timestamp %" PRId64 " with unit %d to broken down time",
value, unit);
return ADBC_STATUS_INVALID_ARGUMENT;
}
#endif
char* tsstr = malloc(strlen + 1);
if (tsstr == NULL) {
return ADBC_STATUS_IO;
}
if (strftime(tsstr, strlen, "%Y-%m-%dT%H:%M:%S", &broken_down_time) == 0) {
SetError(error, "Call to strftime for timestamp %" PRId64 " with unit %d failed",
value, unit);
free(tsstr);
return ADBC_STATUS_INVALID_ARGUMENT;
}
assert(rem >= 0);
switch (unit) {
case NANOARROW_TIME_UNIT_SECOND:
break;
case NANOARROW_TIME_UNIT_MILLI:
tsstr[19] = '.';
snprintf(tsstr + 20, strlen - 20, "%03d", rem % 1000u);
break;
case NANOARROW_TIME_UNIT_MICRO:
tsstr[19] = '.';
snprintf(tsstr + 20, strlen - 20, "%06d", rem % 1000000u);
break;
case NANOARROW_TIME_UNIT_NANO:
tsstr[19] = '.';
snprintf(tsstr + 20, strlen - 20, "%09d", rem % 1000000000u);
break;
}
*buf = tsstr;
return ADBC_STATUS_OK;
}
AdbcStatusCode AdbcSqliteBinderBindNext(struct AdbcSqliteBinder* binder, sqlite3* conn,
sqlite3_stmt* stmt, char* finished,
struct AdbcError* error) {
struct ArrowError arrow_error = {0};
int status = 0;
while (!binder->array.release || binder->next_row >= binder->array.length) {
if (binder->array.release) {
ArrowArrayViewReset(&binder->batch);
binder->array.release(&binder->array);
status =
ArrowArrayViewInitFromSchema(&binder->batch, &binder->schema, &arrow_error);
if (status != 0) {
SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
strerror(status), arrow_error.message);
return ADBC_STATUS_INTERNAL;
}
}
status = binder->params.get_next(&binder->params, &binder->array);
if (status != 0) {
const char* message = binder->params.get_last_error(&binder->params);
if (!message) message = "(unknown error)";
SetError(error, "Failed to get next parameter batch: (%d) %s: %s", status,
strerror(status), message);
return ADBC_STATUS_IO;
}
if (!binder->array.release) {
*finished = 1;
AdbcSqliteBinderRelease(binder);
return ADBC_STATUS_OK;
}
status = ArrowArrayViewSetArray(&binder->batch, &binder->array, &arrow_error);
if (status != 0) {
SetError(error, "Failed to initialize array view: (%d) %s: %s", status,
strerror(status), arrow_error.message);
return ADBC_STATUS_INTERNAL;
}
binder->next_row = 0;
}
if (sqlite3_reset(stmt) != SQLITE_OK) {
SetError(error, "Failed to reset statement: %s", sqlite3_errmsg(conn));
return ADBC_STATUS_INTERNAL;
}
if (sqlite3_clear_bindings(stmt) != SQLITE_OK) {
SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
return ADBC_STATUS_INTERNAL;
}
for (int col = 0; col < binder->schema.n_children; col++) {
if (ArrowArrayViewIsNull(binder->batch.children[col], binder->next_row)) {
status = sqlite3_bind_null(stmt, col + 1);
} else {
switch (binder->types[col]) {
case NANOARROW_TYPE_BINARY:
case NANOARROW_TYPE_LARGE_BINARY:
case NANOARROW_TYPE_FIXED_SIZE_BINARY:
case NANOARROW_TYPE_BINARY_VIEW: {
struct ArrowBufferView value =
ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
status = sqlite3_bind_blob(stmt, col + 1, value.data.as_char,
(int)value.size_bytes, SQLITE_STATIC);
break;
}
case NANOARROW_TYPE_BOOL:
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_UINT16:
case NANOARROW_TYPE_UINT32:
case NANOARROW_TYPE_UINT64: {
uint64_t value =
ArrowArrayViewGetUIntUnsafe(binder->batch.children[col], binder->next_row);
if (value > INT64_MAX) {
SetError(error,
"Column %d has unsigned integer value %" PRIu64
"out of range of int64_t",
col, value);
return ADBC_STATUS_INVALID_ARGUMENT;
}
status = sqlite3_bind_int64(stmt, col + 1, (int64_t)value);
break;
}
case NANOARROW_TYPE_INT8:
case NANOARROW_TYPE_INT16:
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_INT64: {
int64_t value =
ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
status = sqlite3_bind_int64(stmt, col + 1, value);
break;
}
case NANOARROW_TYPE_HALF_FLOAT:
case NANOARROW_TYPE_FLOAT:
case NANOARROW_TYPE_DOUBLE: {
double value = ArrowArrayViewGetDoubleUnsafe(binder->batch.children[col],
binder->next_row);
status = sqlite3_bind_double(stmt, col + 1, value);
break;
}
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_STRING_VIEW: {
struct ArrowBufferView value =
ArrowArrayViewGetBytesUnsafe(binder->batch.children[col], binder->next_row);
status = sqlite3_bind_text(stmt, col + 1, value.data.as_char,
(int)value.size_bytes, SQLITE_STATIC);
break;
}
case NANOARROW_TYPE_DICTIONARY: {
int64_t value_index =
ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
if (ArrowArrayViewIsNull(binder->batch.children[col]->dictionary,
value_index)) {
status = sqlite3_bind_null(stmt, col + 1);
} else {
struct ArrowBufferView value = ArrowArrayViewGetBytesUnsafe(
binder->batch.children[col]->dictionary, value_index);
status = sqlite3_bind_text(stmt, col + 1, value.data.as_char,
(int)value.size_bytes, SQLITE_STATIC);
}
break;
}
case NANOARROW_TYPE_DATE32: {
int64_t value =
ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
char* tsstr;
if ((value > INT32_MAX) || (value < INT32_MIN)) {
SetError(error,
"Column %d has value %" PRId64
" which exceeds the expected range "
"for an Arrow DATE32 type",
col, value);
return ADBC_STATUS_INVALID_DATA;
}
RAISE_ADBC(ArrowDate32ToIsoString((int32_t)value, &tsstr, error));
// SQLITE_TRANSIENT ensures the value is copied during bind
status = sqlite3_bind_text(stmt, col + 1, tsstr, (int)strlen(tsstr),
SQLITE_TRANSIENT);
free(tsstr);
break;
}
case NANOARROW_TYPE_TIMESTAMP: {
struct ArrowSchemaView bind_schema_view;
RAISE_NA(ArrowSchemaViewInit(&bind_schema_view, binder->schema.children[col],
&arrow_error));
enum ArrowTimeUnit unit = bind_schema_view.time_unit;
int64_t value =
ArrowArrayViewGetIntUnsafe(binder->batch.children[col], binder->next_row);
char* tsstr;
RAISE_ADBC(ArrowTimestampToIsoString(value, unit, &tsstr, error));
// SQLITE_TRANSIENT ensures the value is copied during bind
status = sqlite3_bind_text(stmt, col + 1, tsstr, (int)strlen(tsstr),
SQLITE_TRANSIENT);
free((char*)tsstr);
break;
}
default:
SetError(error, "Column %d has unsupported type %s", col,
ArrowTypeString(binder->types[col]));
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
if (status != SQLITE_OK) {
SetError(error, "Failed to clear statement bindings: %s", sqlite3_errmsg(conn));
return ADBC_STATUS_INTERNAL;
}
}
binder->next_row++;
*finished = 0;
return ADBC_STATUS_OK;
}
void AdbcSqliteBinderRelease(struct AdbcSqliteBinder* binder) {
if (binder->schema.release) {
binder->schema.release(&binder->schema);
}
if (binder->params.release) {
binder->params.release(&binder->params);
}
if (binder->types) {
free(binder->types);
}
if (binder->array.release) {
binder->array.release(&binder->array);
}
ArrowArrayViewReset(&binder->batch);
memset(binder, 0, sizeof(*binder));
}
struct StatementReader {
sqlite3* db;
sqlite3_stmt* stmt;
enum ArrowType* types;
struct ArrowSchema schema;
struct ArrowArray initial_batch;
struct AdbcSqliteBinder* binder;
struct ArrowError error;
char done;
int batch_size;
};
const char* StatementReaderGetLastError(struct ArrowArrayStream* self) {
if (!self->release || !self->private_data) {
return NULL;
}
struct StatementReader* reader = (struct StatementReader*)self->private_data;
return reader->error.message;
}
void StatementReaderSetError(struct StatementReader* reader) {
const char* msg = sqlite3_errmsg(reader->db);
// Reset here so that we don't get an error again in StatementRelease
(void)sqlite3_reset(reader->stmt);
strncpy(reader->error.message, msg, sizeof(reader->error.message) - 1);
reader->error.message[sizeof(reader->error.message) - 1] = '\0';
}
int StatementReaderGetOneValue(struct StatementReader* reader, int col,
struct ArrowArray* out) {
int sqlite_type = sqlite3_column_type(reader->stmt, col);
if (sqlite_type == SQLITE_NULL) {
return ArrowArrayAppendNull(out, 1);
}
switch (reader->types[col]) {
case NANOARROW_TYPE_INT64: {
switch (sqlite_type) {
case SQLITE_INTEGER: {
int64_t value = sqlite3_column_int64(reader->stmt, col);
return ArrowArrayAppendInt(out, value);
}
case SQLITE_FLOAT: {
// TODO: behavior needs to be configurable
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected INT64 but got DOUBLE",
col);
return EIO;
}
case SQLITE_TEXT:
case SQLITE_BLOB: {
snprintf(
reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected INT64 but got STRING/BINARY",
col);
return EIO;
}
default: {
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected INT64 but got unknown "
"type %d",
col, sqlite_type);
return ENOTSUP;
}
}
break;
}
case NANOARROW_TYPE_DOUBLE: {
switch (sqlite_type) {
case SQLITE_INTEGER:
case SQLITE_FLOAT: {
// Let SQLite convert
double value = sqlite3_column_double(reader->stmt, col);
return ArrowArrayAppendDouble(out, value);
}
case SQLITE_TEXT:
case SQLITE_BLOB: {
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected DOUBLE but got "
"STRING/BINARY",
col);
return EIO;
}
default: {
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected DOUBLE but got unknown "
"type %d",
col, sqlite_type);
return ENOTSUP;
}
}
break;
}
case NANOARROW_TYPE_STRING: {
switch (sqlite_type) {
case SQLITE_INTEGER:
case SQLITE_FLOAT:
case SQLITE_TEXT:
case SQLITE_BLOB:
break;
default: {
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected STRING but got unknown "
"type %d",
col, sqlite_type);
return ENOTSUP;
}
}
// Let SQLite convert
struct ArrowStringView value = {
.data = (const char*)sqlite3_column_text(reader->stmt, col),
.size_bytes = sqlite3_column_bytes(reader->stmt, col),
};
return ArrowArrayAppendString(out, value);
}
case NANOARROW_TYPE_BINARY: {
switch (sqlite_type) {
case SQLITE_TEXT:
case SQLITE_BLOB:
break;
default: {
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Type mismatch in column %d: expected BLOB but got unknown "
"type %d",
col, sqlite_type);
return ENOTSUP;
}
}
// Let SQLite convert
struct ArrowBufferView value;
value.data.data = sqlite3_column_blob(reader->stmt, col);
value.size_bytes = sqlite3_column_bytes(reader->stmt, col);
return ArrowArrayAppendBytes(out, value);
}
default:
break;
}
snprintf(reader->error.message, sizeof(reader->error.message),
"[SQLite] Internal error: unknown inferred column type %d",
reader->types[col]);
return ENOTSUP;
}
int StatementReaderGetNext(struct ArrowArrayStream* self, struct ArrowArray* out) {
if (!self->release || !self->private_data) {
return EINVAL;
}
struct StatementReader* reader = (struct StatementReader*)self->private_data;
if (reader->initial_batch.release != NULL) {
// Canonically return zero-row results as a stream with zero batches
if (reader->initial_batch.length == 0) {
reader->initial_batch.release(&reader->initial_batch);
reader->done = true;
out->release = NULL;
return 0;
} else {
memcpy(out, &reader->initial_batch, sizeof(*out));
memset(&reader->initial_batch, 0, sizeof(reader->initial_batch));
return 0;
}
} else if (reader->done) {
out->release = NULL;
return 0;
}
RAISE_NA(ArrowArrayInitFromSchema(out, &reader->schema, &reader->error));
RAISE_NA(ArrowArrayStartAppending(out));
int64_t batch_size = 0;
int status = 0;
sqlite3_mutex_enter(sqlite3_db_mutex(reader->db));
while (batch_size < reader->batch_size) {
int rc = sqlite3_step(reader->stmt);
if (rc == SQLITE_DONE) {
if (!reader->binder) {
reader->done = 1;
break;
} else {
char finished = 0;
struct AdbcError error = {0};
status = AdbcSqliteBinderBindNext(reader->binder, reader->db, reader->stmt,
&finished, &error);
if (status != ADBC_STATUS_OK) {
reader->done = 1;
status = EIO;
if (error.release) {
strncpy(reader->error.message, error.message,
sizeof(reader->error.message) - 1);
reader->error.message[sizeof(reader->error.message) - 1] = '\0';
error.release(&error);
}
break;
} else if (finished) {
reader->done = 1;
break;
}
continue;
}
} else if (rc == SQLITE_ERROR) {
reader->done = 1;
status = EIO;
StatementReaderSetError(reader);
break;
} else if (rc != SQLITE_ROW) {
reader->done = 1;
status = ADBC_STATUS_INTERNAL;
StatementReaderSetError(reader);
break;
}
for (int col = 0; col < reader->schema.n_children; col++) {
status = StatementReaderGetOneValue(reader, col, out->children[col]);
if (status != 0) break;
}
if (status != 0) break;
batch_size++;
}
if (status == 0) {
out->length = batch_size;
for (int i = 0; i < reader->schema.n_children; i++) {
status = ArrowArrayFinishBuildingDefault(out->children[i], &reader->error);
if (status != 0) break;
}
// If we didn't read any rows, the reader is exhausted - don't generate a spurious
// batch
if (batch_size == 0) out->release(out);
}
sqlite3_mutex_leave(sqlite3_db_mutex(reader->db));
return status;
}
int StatementReaderGetSchema(struct ArrowArrayStream* self, struct ArrowSchema* out) {
if (!self->release || !self->private_data) {
return EINVAL;
}
struct StatementReader* reader = (struct StatementReader*)self->private_data;
return ArrowSchemaDeepCopy(&reader->schema, out);
}
void StatementReaderRelease(struct ArrowArrayStream* self) {
if (self->private_data) {
struct StatementReader* reader = (struct StatementReader*)self->private_data;
if (reader->schema.release) {
reader->schema.release(&reader->schema);
}
if (reader->initial_batch.release) {
reader->initial_batch.release(&reader->initial_batch);
}
if (reader->types) {
free(reader->types);
}
if (reader->binder) {
AdbcSqliteBinderRelease(reader->binder);
}
free(self->private_data);
}
self->private_data = NULL;
self->release = NULL;
self->get_last_error = NULL;
self->get_next = NULL;
self->get_schema = NULL;
}
// -- Type inferring reader ------------------------------------------
//
// Every column starts as INT64. A certain number of rows is read, and
// the column is upcast if a different type is read. Afterwards,
// 'compatible' values are upcast (int64 <: double <: string) and
// 'incompatible' values will raise an error.
//
// Notes:
// - The upcasting may be different when done during the inference
// stage vs the reading stage, because in the former, we have to do
// the upcasting, and in the latter, we let SQLite handle it.
// - We don't use unions since those are a pain to work with.
//
// Improvements/to-dos:
// - SQLITE_BLOB type values are not handled
// - Columns where no non-NULL value is seen should be typed as STRING
// for maximum flexibility for later values
// - Make this more flexible (e.g. choose whether to attempt to cast
// incompatible values, or insert a null, instead of erroring)
// - Add the option to try to use SQLite's designated type for a
// column, instead of inferring the type (where possible).
/// Initialize buffers for the first (type-inferred) batch of data.
/// Use raw buffers since the types may change.
AdbcStatusCode StatementReaderInitializeInfer(int num_columns, size_t infer_rows,
struct ArrowBitmap* validity,
struct ArrowBuffer* data,
struct ArrowBuffer* binary,
enum ArrowType* current_type,
struct AdbcError* error) {
for (int i = 0; i < num_columns; i++) {
ArrowBitmapInit(&validity[i]);
CHECK_NA(INTERNAL, ArrowBitmapReserve(&validity[i], infer_rows), error);
ArrowBufferInit(&data[i]);
CHECK_NA(INTERNAL, ArrowBufferReserve(&data[i], infer_rows * sizeof(int64_t)), error);
ArrowBufferInit(&binary[i]);
current_type[i] = NANOARROW_TYPE_INT64;
}
return ADBC_STATUS_OK;
} // NOLINT(whitespace/indent)
/// Finalize the first (type-inferred) batch of data.
AdbcStatusCode StatementReaderInferFinalize(
sqlite3_stmt* stmt, int num_columns, int64_t num_rows, struct StatementReader* reader,
struct ArrowBitmap* validity, struct ArrowBuffer* data, struct ArrowBuffer* binary,
enum ArrowType* current_type, struct AdbcError* error) {
ArrowSchemaInit(&reader->schema);
CHECK_NA(INTERNAL, ArrowSchemaSetTypeStruct(&reader->schema, num_columns), error);
for (int col = 0; col < num_columns; col++) {
struct ArrowSchema* field = reader->schema.children[col];
const char* name = sqlite3_column_name(stmt, col);
CHECK_NA(INTERNAL, ArrowSchemaSetType(field, current_type[col]), error);
CHECK_NA(INTERNAL, ArrowSchemaSetName(field, name), error);
}
CHECK_NA(INTERNAL,
ArrowArrayInitFromSchema(&reader->initial_batch, &reader->schema, NULL),
error);
// Do validation up front, so that we either move 0 buffers or all buffers
for (int col = 0; col < num_columns; col++) {
if (current_type[col] == NANOARROW_TYPE_STRING ||
current_type[col] == NANOARROW_TYPE_BINARY) {
if (binary[col].data == NULL) {
SetError(error, "INTERNAL: column has binary-like type but no backing buffer");
return ADBC_STATUS_INTERNAL;
}
}
reader->initial_batch.children[col]->length = num_rows;
}
reader->initial_batch.length = num_rows;
for (int col = 0; col < num_columns; col++) {
struct ArrowArray* arr = reader->initial_batch.children[col];
ArrowArraySetValidityBitmap(arr, &validity[col]);
// XXX: ignore return code since we've hardcoded the buffer index
(void)ArrowArraySetBuffer(arr, 1, &data[col]);
if (current_type[col] == NANOARROW_TYPE_STRING ||
current_type[col] == NANOARROW_TYPE_BINARY) {
(void)ArrowArraySetBuffer(arr, 2, &binary[col]);
}
arr->length = num_rows;
}
return ADBC_STATUS_OK;
}
// Convert an int64 typed column to double.
AdbcStatusCode StatementReaderUpcastInt64ToDouble(struct ArrowBuffer* data,
struct AdbcError* error) {
struct ArrowBuffer doubles;
ArrowBufferInit(&doubles);
CHECK_NA(INTERNAL, ArrowBufferReserve(&doubles, data->capacity_bytes), error);
size_t num_elements = data->size_bytes / sizeof(int64_t);
const int64_t* elements = (const int64_t*)data->data;
for (size_t i = 0; i < num_elements; i++) {
double value = (double)elements[i];
ArrowBufferAppendUnsafe(&doubles, &value, sizeof(double));
}
ArrowBufferReset(data);
ArrowBufferMove(&doubles, data);
return ADBC_STATUS_OK;
}
AdbcStatusCode StatementReaderAppendInt64ToBinary(struct ArrowBuffer* offsets,
struct ArrowBuffer* binary,
int64_t value, int32_t* offset,
struct AdbcError* error) {
// Make sure we have at least 21 bytes available (19 digits + sign + null)
// Presumably this is enough, but manpage for snprintf makes no guarantees
// about whether locale may affect this, so check for truncation regardless
static const size_t kReserve = 21;
size_t buffer_size = kReserve;
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
char* output = (char*)(binary->data + binary->size_bytes);
int written = 0;
while (1) {
written = snprintf(output, buffer_size, "%" PRId64, value);
if (written < 0) {
SetError(error, "Encoding error when upcasting double to string");
return ADBC_STATUS_INTERNAL;
} else if (((size_t)written) >= buffer_size) {
// Truncated, resize and try again
// Check for overflow - presumably this can never happen...?
if (UINT_MAX - buffer_size < buffer_size) {
SetError(error, "Overflow when upcasting double to string");
return ADBC_STATUS_INTERNAL;
}
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
buffer_size += buffer_size;
continue;
}
break;
}
*offset += written;
binary->size_bytes += written;
ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
return ADBC_STATUS_OK;
}
AdbcStatusCode StatementReaderAppendDoubleToBinary(struct ArrowBuffer* offsets,
struct ArrowBuffer* binary,
double value, int32_t* offset,
struct AdbcError* error) {
static const size_t kReserve = 64;
size_t buffer_size = kReserve;
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
char* output = (char*)(binary->data + binary->size_bytes);
int written = 0;
while (1) {
written = snprintf(output, buffer_size, "%e", value);
if (written < 0) {
SetError(error, "Encoding error when upcasting double to string");
return ADBC_STATUS_INTERNAL;
} else if (((size_t)written) >= buffer_size) {
// Truncated, resize and try again
// Check for overflow - presumably this can never happen...?
if (UINT_MAX - buffer_size < buffer_size) {
SetError(error, "Overflow when upcasting double to string");
return ADBC_STATUS_INTERNAL;
}
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, buffer_size), error);
buffer_size += buffer_size;
continue;
}
break;
}
*offset += written;
binary->size_bytes += written;
ArrowBufferAppendUnsafe(offsets, offset, sizeof(int32_t));
return ADBC_STATUS_OK;
}
AdbcStatusCode StatementReaderUpcastInt64ToBinary(struct ArrowBuffer* data,
struct ArrowBuffer* binary,
struct AdbcError* error) {
struct ArrowBuffer offsets;
ArrowBufferInit(&offsets);
ArrowBufferInit(binary);
CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
size_t num_elements = data->size_bytes / sizeof(int64_t);
const int64_t* elements = (const int64_t*)data->data;
int32_t offset = 0;
ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
for (size_t i = 0; i < num_elements; i++) {
AdbcStatusCode status =
StatementReaderAppendInt64ToBinary(&offsets, binary, elements[i], &offset, error);
if (status != ADBC_STATUS_OK) return status;
}
ArrowBufferReset(data);
ArrowBufferMove(&offsets, data);
return ADBC_STATUS_OK;
}
AdbcStatusCode StatementReaderUpcastDoubleToBinary(struct ArrowBuffer* data,
struct ArrowBuffer* binary,
struct AdbcError* error) {
struct ArrowBuffer offsets;
ArrowBufferInit(&offsets);
ArrowBufferInit(binary);
CHECK_NA(INTERNAL, ArrowBufferReserve(&offsets, data->capacity_bytes), error);
CHECK_NA(INTERNAL, ArrowBufferReserve(binary, data->capacity_bytes), error);
size_t num_elements = data->size_bytes / sizeof(double);
const double* elements = (const double*)data->data;
int32_t offset = 0;
ArrowBufferAppendUnsafe(&offsets, &offset, sizeof(int32_t));
for (size_t i = 0; i < num_elements; i++) {
AdbcStatusCode status = StatementReaderAppendDoubleToBinary(
&offsets, binary, elements[i], &offset, error);
if (status != ADBC_STATUS_OK) return status;
}
ArrowBufferReset(data);
ArrowBufferMove(&offsets, data);
return ADBC_STATUS_OK;
}
/// Append a single value to a single column.
AdbcStatusCode StatementReaderInferOneValue(
sqlite3_stmt* stmt, int col, struct ArrowBitmap* validity, struct ArrowBuffer* data,
struct ArrowBuffer* binary, enum ArrowType* current_type, struct AdbcError* error) {
// TODO: static_assert sizeof(int64) == sizeof(double)
int sqlite_type = sqlite3_column_type(stmt, col);
switch (sqlite_type) {
case SQLITE_NULL: {
ArrowBitmapAppendUnsafe(validity, /*set=*/0, /*length=*/1);
switch (*current_type) {
case NANOARROW_TYPE_INT64: {
int64_t value = 0;
ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
break;
}
case NANOARROW_TYPE_DOUBLE: {
double value = 0.0;
ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
break;
}
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_BINARY: {
const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
return ADBC_STATUS_OK;
}
default:
return ADBC_STATUS_INTERNAL;
}
break;
}
case SQLITE_INTEGER: {
ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
switch (*current_type) {
case NANOARROW_TYPE_INT64: {
int64_t value = sqlite3_column_int64(stmt, col);
ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
break;
}
case NANOARROW_TYPE_DOUBLE: {
double value = sqlite3_column_double(stmt, col);
ArrowBufferAppendUnsafe(data, &value, sizeof(int64_t));
break;
}
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_BINARY: {
int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
return StatementReaderAppendInt64ToBinary(
data, binary, sqlite3_column_int64(stmt, col), &offset, error);
}
default:
return ADBC_STATUS_INTERNAL;
}
break;
}
case SQLITE_FLOAT: {
ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
switch (*current_type) {
case NANOARROW_TYPE_INT64: {
AdbcStatusCode status = StatementReaderUpcastInt64ToDouble(data, error);
if (status != ADBC_STATUS_OK) return status;
*current_type = NANOARROW_TYPE_DOUBLE;
double value = sqlite3_column_double(stmt, col);
ArrowBufferAppendUnsafe(data, &value, sizeof(double));
break;
}
case NANOARROW_TYPE_DOUBLE: {
double value = sqlite3_column_double(stmt, col);
ArrowBufferAppendUnsafe(data, &value, sizeof(double));
break;
}
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_BINARY: {
int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1];
return StatementReaderAppendDoubleToBinary(
data, binary, sqlite3_column_double(stmt, col), &offset, error);
}
default:
return ADBC_STATUS_INTERNAL;
}
break;
}
case SQLITE_TEXT: {
ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
switch (*current_type) {
case NANOARROW_TYPE_INT64: {
AdbcStatusCode status = StatementReaderUpcastInt64ToBinary(data, binary, error);
if (status != ADBC_STATUS_OK) return status;
*current_type = NANOARROW_TYPE_STRING;
break;
}
case NANOARROW_TYPE_DOUBLE: {
AdbcStatusCode status =
StatementReaderUpcastDoubleToBinary(data, binary, error);
if (status != ADBC_STATUS_OK) return status;
*current_type = NANOARROW_TYPE_STRING;
break;
}
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_BINARY:
break;
default:
return ADBC_STATUS_INTERNAL;
}
const unsigned char* value = sqlite3_column_text(stmt, col);
const int size = sqlite3_column_bytes(stmt, col);
const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1] + size;
CHECK_NA(INTERNAL, ArrowBufferAppend(binary, value, size), error);
CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
break;
}
case SQLITE_BLOB: {
ArrowBitmapAppendUnsafe(validity, /*set=*/1, /*length=*/1);
switch (*current_type) {
case NANOARROW_TYPE_INT64: {
AdbcStatusCode status = StatementReaderUpcastInt64ToBinary(data, binary, error);
if (status != ADBC_STATUS_OK) return status;
*current_type = NANOARROW_TYPE_BINARY;
break;
}
case NANOARROW_TYPE_DOUBLE: {
AdbcStatusCode status =
StatementReaderUpcastDoubleToBinary(data, binary, error);
if (status != ADBC_STATUS_OK) return status;
*current_type = NANOARROW_TYPE_BINARY;
break;
}
case NANOARROW_TYPE_STRING:
*current_type = NANOARROW_TYPE_BINARY;
break;
case NANOARROW_TYPE_BINARY:
break;
default:
return ADBC_STATUS_INTERNAL;
}
const void* value = sqlite3_column_blob(stmt, col);
const int size = sqlite3_column_bytes(stmt, col);
const int32_t offset = ((int32_t*)data->data)[data->size_bytes / 4 - 1] + size;
CHECK_NA(INTERNAL, ArrowBufferAppend(binary, value, size), error);
CHECK_NA(INTERNAL, ArrowBufferAppend(data, &offset, sizeof(offset)), error);
break;
}
default: {
return ADBC_STATUS_NOT_IMPLEMENTED;
}
}
return ADBC_STATUS_OK;
} // NOLINT(whitespace/indent)
AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
struct AdbcSqliteBinder* binder, size_t batch_size,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
struct StatementReader* reader = malloc(sizeof(struct StatementReader));
memset(reader, 0, sizeof(struct StatementReader));
reader->db = db;
reader->stmt = stmt;
reader->batch_size = (int)batch_size;
stream->private_data = reader;
stream->release = StatementReaderRelease;
stream->get_last_error = StatementReaderGetLastError;
stream->get_next = StatementReaderGetNext;
stream->get_schema = StatementReaderGetSchema;
sqlite3_mutex_enter(sqlite3_db_mutex(db));
const int num_columns = sqlite3_column_count(stmt);
struct ArrowBitmap* validity = malloc(num_columns * sizeof(struct ArrowBitmap));
struct ArrowBuffer* data = malloc(num_columns * sizeof(struct ArrowBuffer));
struct ArrowBuffer* binary = malloc(num_columns * sizeof(struct ArrowBuffer));
enum ArrowType* current_type = malloc(num_columns * sizeof(enum ArrowType));
AdbcStatusCode status = StatementReaderInitializeInfer(
num_columns, batch_size, validity, data, binary, current_type, error);
if (binder) {
char finished = 0;
status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
if (finished) {
reader->done = 1;
}
}
if (status == ADBC_STATUS_OK && !reader->done) {
int64_t num_rows = 0;
while (((size_t)num_rows) < batch_size) {
int rc = sqlite3_step(stmt);
if (rc == SQLITE_DONE) {
if (!binder) {
reader->done = 1;
break;
} else {
char finished = 0;
status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
if (status != ADBC_STATUS_OK) break;
if (finished) {
reader->done = 1;
break;
}
}
continue;
} else if (rc == SQLITE_ERROR) {
SetError(error, "Failed to step query: %s", sqlite3_errmsg(db));
status = ADBC_STATUS_IO;
// Reset here so that we don't get an error again in StatementRelease
(void)sqlite3_reset(stmt);
break;
} else if (rc != SQLITE_ROW) {
status = ADBC_STATUS_INTERNAL;
break;
}
for (int col = 0; col < num_columns; col++) {
status = StatementReaderInferOneValue(stmt, col, &validity[col], &data[col],
&binary[col], ¤t_type[col], error);
if (status != ADBC_STATUS_OK) break;
}
if (status != ADBC_STATUS_OK) break;
num_rows++;
}
if (status == ADBC_STATUS_OK) {
status = StatementReaderInferFinalize(stmt, num_columns, num_rows, reader, validity,
data, binary, current_type, error);
}
}
if (status != ADBC_STATUS_OK) {
// Free the individual buffers
// This is OK, since InferFinalize either moves all buffers or no buffers
for (int i = 0; i < num_columns; i++) {
ArrowBitmapReset(&validity[i]);
ArrowBufferReset(&data[i]);
ArrowBufferReset(&binary[i]);
}
free(current_type);
} else {
reader->types = current_type;
reader->binder = binder;
}
free(data);
free(validity);
free(binary);
sqlite3_mutex_leave(sqlite3_db_mutex(db));
return status;
} // NOLINT(whitespace/indent)