in c/driver/sqlite/sqlite.cc [778:973]
Result<int64_t> ExecuteIngestImpl(IngestState& state) {
UNWRAP_STATUS(BindImpl());
if (!binder_.schema.release) {
return status::InvalidState("must Bind() before bulk ingestion");
}
// Parameter validation
if (state.target_catalog && state.temporary) {
return status::fmt::InvalidState("{} Cannot set both {} and {}", kErrorPrefix,
ADBC_INGEST_OPTION_TARGET_CATALOG,
ADBC_INGEST_OPTION_TEMPORARY);
} else if (state.target_schema) {
return status::fmt::NotImplemented("{} {} not supported", kErrorPrefix,
ADBC_INGEST_OPTION_TARGET_DB_SCHEMA);
} else if (!state.target_table) {
return status::fmt::InvalidState("{} Must set {}", kErrorPrefix,
ADBC_INGEST_OPTION_TARGET_TABLE);
}
// Create statements for creating the table, inserting a row, and the table name
SqliteStringBuilder create_query, drop_query, insert_query, table_builder;
if (state.target_catalog) {
table_builder.Append(R"("%w" . "%w")", state.target_catalog->c_str(),
state.target_table->c_str());
} else if (state.temporary) {
// OK to be redundant (CREATE TEMP TABLE temp.foo)
table_builder.Append(R"(temp . "%w")", state.target_table->c_str());
} else {
// If not temporary, explicitly target the main database
table_builder.Append(R"(main . "%w")", state.target_table->c_str());
}
UNWRAP_RESULT(std::string_view table, table_builder.GetString());
switch (state.table_exists_) {
case Base::TableExists::kAppend:
if (state.temporary) {
create_query.Append("CREATE TEMPORARY TABLE IF NOT EXISTS %s (", table.data());
} else {
create_query.Append("CREATE TABLE IF NOT EXISTS %s (", table.data());
}
break;
case Base::TableExists::kFail:
case Base::TableExists::kReplace:
if (state.temporary) {
create_query.Append("CREATE TEMPORARY TABLE %s (", table.data());
} else {
create_query.Append("CREATE TABLE %s (", table.data());
}
drop_query.Append("DROP TABLE IF EXISTS %s", table.data());
break;
}
insert_query.Append("INSERT INTO %s (", table.data());
struct ArrowError arrow_error = {0};
struct ArrowSchemaView view;
std::memset(&view, 0, sizeof(view));
for (int i = 0; i < binder_.schema.n_children; i++) {
if (i > 0) {
create_query.Append(", ");
insert_query.Append(", ");
}
create_query.Append(R"("%w")", binder_.schema.children[i]->name);
insert_query.Append(R"("%w")", binder_.schema.children[i]->name);
int status = ArrowSchemaViewInit(&view, binder_.schema.children[i], &arrow_error);
if (status != 0) {
return status::fmt::Internal("failed to parse schema for column {}: {} ({}): {}",
i, std::strerror(status), status,
arrow_error.message);
}
switch (view.type) {
case NANOARROW_TYPE_BOOL:
case NANOARROW_TYPE_UINT8:
case NANOARROW_TYPE_UINT16:
case NANOARROW_TYPE_UINT32:
case NANOARROW_TYPE_UINT64:
case NANOARROW_TYPE_INT8:
case NANOARROW_TYPE_INT16:
case NANOARROW_TYPE_INT32:
case NANOARROW_TYPE_INT64:
create_query.Append(" INTEGER");
break;
case NANOARROW_TYPE_FLOAT:
case NANOARROW_TYPE_DOUBLE:
create_query.Append(" REAL");
break;
case NANOARROW_TYPE_STRING:
case NANOARROW_TYPE_LARGE_STRING:
case NANOARROW_TYPE_DATE32:
create_query.Append(" TEXT");
break;
case NANOARROW_TYPE_BINARY:
create_query.Append(" BLOB");
break;
default:
break;
}
}
create_query.Append(")");
insert_query.Append(") VALUES (");
for (int i = 0; i < binder_.schema.n_children; i++) {
insert_query.Append("%s?", (i > 0 ? ", " : ""));
}
insert_query.Append(")");
UNWRAP_RESULT(std::string_view create, create_query.GetString());
UNWRAP_RESULT(std::string_view drop, drop_query.GetString());
UNWRAP_RESULT(std::string_view insert, insert_query.GetString());
// Drop/create tables as needed
switch (state.table_exists_) {
case Base::TableExists::kAppend:
case Base::TableExists::kFail:
// Do nothing
break;
case Base::TableExists::kReplace: {
UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, drop));
break;
}
}
switch (state.table_does_not_exist_) {
case Base::TableDoesNotExist::kCreate: {
UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, create));
break;
}
case Base::TableDoesNotExist::kFail:
// Do nothing
break;
}
// Insert
int64_t row_count = 0;
const int is_autocommit = sqlite3_get_autocommit(conn_);
if (is_autocommit) {
UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "BEGIN"));
}
assert(!insert.empty());
sqlite3_stmt* stmt = nullptr;
{
int rc = sqlite3_prepare_v2(conn_, insert.data(), static_cast<int>(insert.size()),
&stmt, /*pzTail=*/nullptr);
if (rc != SQLITE_OK) {
std::ignore = sqlite3_finalize(stmt);
return status::fmt::Internal("failed to prepare: {}\nquery was: {}",
sqlite3_errmsg(conn_), insert);
}
}
assert(stmt != nullptr);
AdbcStatusCode status_code = ADBC_STATUS_OK;
Status status = status::Ok();
struct AdbcError error = ADBC_ERROR_INIT;
while (true) {
char finished = 0;
status_code = AdbcSqliteBinderBindNext(&binder_, conn_, stmt, &finished, &error);
if (status_code != ADBC_STATUS_OK || finished) {
status = Status::FromAdbc(status_code, error);
break;
}
int rc = 0;
do {
rc = sqlite3_step(stmt);
} while (rc == SQLITE_ROW);
if (rc != SQLITE_DONE) {
status = status::fmt::Internal("failed to execute: {}\nquery was: {}",
sqlite3_errmsg(conn_), insert.data());
status_code = ADBC_STATUS_INTERNAL;
break;
}
row_count++;
}
std::ignore = sqlite3_finalize(stmt);
if (is_autocommit) {
if (status_code == ADBC_STATUS_OK) {
UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "COMMIT"));
} else {
UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "ROLLBACK"));
}
}
if (status_code != ADBC_STATUS_OK) {
return status;
}
return row_count;
}