in c/driver/sqlite/statement_reader.c [1023:1122]
AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
struct AdbcSqliteBinder* binder, size_t batch_size,
struct ArrowArrayStream* stream,
struct AdbcError* error) {
struct StatementReader* reader = malloc(sizeof(struct StatementReader));
memset(reader, 0, sizeof(struct StatementReader));
reader->db = db;
reader->stmt = stmt;
reader->batch_size = batch_size;
stream->private_data = reader;
stream->release = StatementReaderRelease;
stream->get_last_error = StatementReaderGetLastError;
stream->get_next = StatementReaderGetNext;
stream->get_schema = StatementReaderGetSchema;
sqlite3_mutex_enter(sqlite3_db_mutex(db));
const int num_columns = sqlite3_column_count(stmt);
struct ArrowBitmap* validity = malloc(num_columns * sizeof(struct ArrowBitmap));
struct ArrowBuffer* data = malloc(num_columns * sizeof(struct ArrowBuffer));
struct ArrowBuffer* binary = malloc(num_columns * sizeof(struct ArrowBuffer));
enum ArrowType* current_type = malloc(num_columns * sizeof(enum ArrowType));
AdbcStatusCode status = StatementReaderInitializeInfer(
num_columns, batch_size, validity, data, binary, current_type, error);
if (binder) {
char finished = 0;
status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
if (finished) {
reader->done = 1;
}
}
if (status == ADBC_STATUS_OK && !reader->done) {
int64_t num_rows = 0;
while (((size_t)num_rows) < batch_size) {
int rc = sqlite3_step(stmt);
if (rc == SQLITE_DONE) {
if (!binder) {
reader->done = 1;
break;
} else {
char finished = 0;
status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
if (status != ADBC_STATUS_OK) break;
if (finished) {
reader->done = 1;
break;
}
}
continue;
} else if (rc == SQLITE_ERROR) {
SetError(error, "Failed to step query: %s", sqlite3_errmsg(db));
status = ADBC_STATUS_IO;
// Reset here so that we don't get an error again in StatementRelease
(void)sqlite3_reset(stmt);
break;
} else if (rc != SQLITE_ROW) {
status = ADBC_STATUS_INTERNAL;
break;
}
for (int col = 0; col < num_columns; col++) {
status = StatementReaderInferOneValue(stmt, col, &validity[col], &data[col],
&binary[col], ¤t_type[col], error);
if (status != ADBC_STATUS_OK) break;
}
if (status != ADBC_STATUS_OK) break;
num_rows++;
}
if (status == ADBC_STATUS_OK) {
status = StatementReaderInferFinalize(stmt, num_columns, num_rows, reader, validity,
data, binary, current_type, error);
}
}
if (status != ADBC_STATUS_OK) {
// Free the individual buffers
// This is OK, since InferFinalize either moves all buffers or no buffers
for (int i = 0; i < num_columns; i++) {
ArrowBitmapReset(&validity[i]);
ArrowBufferReset(&data[i]);
ArrowBufferReset(&binary[i]);
}
free(current_type);
} else {
reader->types = current_type;
reader->binder = binder;
}
free(data);
free(validity);
free(binary);
sqlite3_mutex_leave(sqlite3_db_mutex(db));
return status;
} // NOLINT(whitespace/indent)