in c/driver/sqlite/sqlite.c [978:1128]
AdbcStatusCode SqliteStatementInitIngest(struct SqliteStatement* stmt,
sqlite3_stmt** insert_statement,
struct AdbcError* error) {
AdbcStatusCode code = ADBC_STATUS_OK;
// Create statements for CREATE TABLE / INSERT
sqlite3_str* create_query = sqlite3_str_new(NULL);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
return ADBC_STATUS_INTERNAL;
}
struct StringBuilder insert_query = {0};
if (StringBuilderInit(&insert_query, /*initial_size=*/256) != 0) {
SetError(error, "[SQLite] Could not initiate StringBuilder");
sqlite3_free(sqlite3_str_finish(create_query));
return ADBC_STATUS_INTERNAL;
}
sqlite3_str_appendf(create_query, "%s%Q%s", "CREATE TABLE ", stmt->target_table, " (");
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
if (StringBuilderAppend(&insert_query, "%s%s%s", "INSERT INTO ", stmt->target_table,
" VALUES (") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
struct ArrowError arrow_error = {0};
struct ArrowSchemaView view = {0};
for (int i = 0; i < stmt->binder.schema.n_children; i++) {
if (i > 0) {
sqlite3_str_appendf(create_query, "%s", ", ");
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
sqlite3_str_appendf(create_query, "%Q", stmt->binder.schema.children[i]->name);
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
int status =
ArrowSchemaViewInit(&view, stmt->binder.schema.children[i], &arrow_error);
if (status != 0) {
SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
strerror(status), status, arrow_error.message);
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
switch (view.type) {
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_UINT16:
case NANOARROW_TYPE_UINT32:
case NANOARROW_TYPE_UINT64:
case NANOARROW_TYPE_INT8:
case NANOARROW_TYPE_INT16:
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_INT64:
sqlite3_str_appendf(create_query, " INTEGER");
break;
case NANOARROW_TYPE_FLOAT:
case NANOARROW_TYPE_DOUBLE:
sqlite3_str_appendf(create_query, " REAL");
break;
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_DATE32:
sqlite3_str_appendf(create_query, " TEXT");
break;
case NANOARROW_TYPE_BINARY:
sqlite3_str_appendf(create_query, " BLOB");
break;
default:
break;
}
if (i > 0) {
if (StringBuilderAppend(&insert_query, "%s", ", ") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
if (StringBuilderAppend(&insert_query, "%s", "?") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
}
sqlite3_str_appendchar(create_query, 1, ')');
if (sqlite3_str_errcode(create_query)) {
SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
if (StringBuilderAppend(&insert_query, "%s", ")") != 0) {
SetError(error, "[SQLite] Call to StringBuilderAppend failed");
code = ADBC_STATUS_INTERNAL;
goto cleanup;
}
sqlite3_stmt* create = NULL;
if (!stmt->append) {
// Create table
int rc =
sqlite3_prepare_v2(stmt->conn, sqlite3_str_value(create_query),
sqlite3_str_length(create_query), &create, /*pzTail=*/NULL);
if (rc == SQLITE_OK) {
rc = sqlite3_step(create);
}
if (rc != SQLITE_OK && rc != SQLITE_DONE) {
SetError(error, "[SQLite] Failed to create table: %s (executed '%.*s')",
sqlite3_errmsg(stmt->conn), sqlite3_str_length(create_query),
sqlite3_str_value(create_query));
code = ADBC_STATUS_INTERNAL;
}
}
if (code == ADBC_STATUS_OK) {
int rc = sqlite3_prepare_v2(stmt->conn, insert_query.buffer, (int)insert_query.size,
insert_statement, /*pzTail=*/NULL);
if (rc != SQLITE_OK) {
SetError(error, "[SQLite] Failed to prepare statement: %s (executed '%s')",
sqlite3_errmsg(stmt->conn), insert_query.buffer);
code = ADBC_STATUS_INTERNAL;
}
}
sqlite3_finalize(create);
cleanup:
sqlite3_free(sqlite3_str_finish(create_query));
StringBuilderReset(&insert_query);
return code;
}