in c/driver/sqlite/statement_reader.c [561:642]
int StatementReaderGetNext(struct ArrowArrayStream* self, struct ArrowArray* out) {
if (!self->release || !self->private_data) {
return EINVAL;
}
struct StatementReader* reader = (struct StatementReader*)self->private_data;
if (reader->initial_batch.release != NULL) {
memcpy(out, &reader->initial_batch, sizeof(*out));
memset(&reader->initial_batch, 0, sizeof(reader->initial_batch));
return 0;
} else if (reader->done) {
out->release = NULL;
return 0;
}
RAISE_NA(ArrowArrayInitFromSchema(out, &reader->schema, &reader->error));
RAISE_NA(ArrowArrayStartAppending(out));
int64_t batch_size = 0;
int status = 0;
sqlite3_mutex_enter(sqlite3_db_mutex(reader->db));
while (batch_size < reader->batch_size) {
int rc = sqlite3_step(reader->stmt);
if (rc == SQLITE_DONE) {
if (!reader->binder) {
reader->done = 1;
break;
} else {
char finished = 0;
struct AdbcError error = {0};
status = AdbcSqliteBinderBindNext(reader->binder, reader->db, reader->stmt,
&finished, &error);
if (status != ADBC_STATUS_OK) {
reader->done = 1;
status = EIO;
if (error.release) {
strncpy(reader->error.message, error.message, sizeof(reader->error.message));
reader->error.message[sizeof(reader->error.message) - 1] = '\0';
error.release(&error);
}
break;
} else if (finished) {
reader->done = 1;
break;
}
continue;
}
} else if (rc == SQLITE_ERROR) {
reader->done = 1;
status = EIO;
StatementReaderSetError(reader);
break;
} else if (rc != SQLITE_ROW) {
reader->done = 1;
status = ADBC_STATUS_INTERNAL;
StatementReaderSetError(reader);
break;
}
for (int col = 0; col < reader->schema.n_children; col++) {
status = StatementReaderGetOneValue(reader, col, out->children[col]);
if (status != 0) break;
}
if (status != 0) break;
batch_size++;
}
if (status == 0) {
out->length = batch_size;
for (int i = 0; i < reader->schema.n_children; i++) {
status = ArrowArrayFinishBuildingDefault(out->children[i], &reader->error);
if (status != 0) break;
}
// If we didn't read any rows, the reader is exhausted - don't generate a spurious
// batch
if (batch_size == 0) out->release(out);
}
sqlite3_mutex_leave(sqlite3_db_mutex(reader->db));
return status;
}