Result ExecuteIngestImpl()

in c/driver/sqlite/sqlite.cc [778:973]


  Result<int64_t> ExecuteIngestImpl(IngestState& state) {
    UNWRAP_STATUS(BindImpl());
    if (!binder_.schema.release) {
      return status::InvalidState("must Bind() before bulk ingestion");
    }

    // Parameter validation

    if (state.target_catalog && state.temporary) {
      return status::fmt::InvalidState("{} Cannot set both {} and {}", kErrorPrefix,
                                       ADBC_INGEST_OPTION_TARGET_CATALOG,
                                       ADBC_INGEST_OPTION_TEMPORARY);
    } else if (state.target_schema) {
      return status::fmt::NotImplemented("{} {} not supported", kErrorPrefix,
                                         ADBC_INGEST_OPTION_TARGET_DB_SCHEMA);
    } else if (!state.target_table) {
      return status::fmt::InvalidState("{} Must set {}", kErrorPrefix,
                                       ADBC_INGEST_OPTION_TARGET_TABLE);
    }

    // Create statements for creating the table, inserting a row, and the table name

    SqliteStringBuilder create_query, drop_query, insert_query, table_builder;
    if (state.target_catalog) {
      table_builder.Append(R"("%w" . "%w")", state.target_catalog->c_str(),
                           state.target_table->c_str());
    } else if (state.temporary) {
      // OK to be redundant (CREATE TEMP TABLE temp.foo)
      table_builder.Append(R"(temp . "%w")", state.target_table->c_str());
    } else {
      // If not temporary, explicitly target the main database
      table_builder.Append(R"(main . "%w")", state.target_table->c_str());
    }

    UNWRAP_RESULT(std::string_view table, table_builder.GetString());

    switch (state.table_exists_) {
      case Base::TableExists::kAppend:
        if (state.temporary) {
          create_query.Append("CREATE TEMPORARY TABLE IF NOT EXISTS %s (", table.data());
        } else {
          create_query.Append("CREATE TABLE IF NOT EXISTS %s (", table.data());
        }
        break;
      case Base::TableExists::kFail:
      case Base::TableExists::kReplace:
        if (state.temporary) {
          create_query.Append("CREATE TEMPORARY TABLE %s (", table.data());
        } else {
          create_query.Append("CREATE TABLE %s (", table.data());
        }
        drop_query.Append("DROP TABLE IF EXISTS %s", table.data());
        break;
    }

    insert_query.Append("INSERT INTO %s (", table.data());

    struct ArrowError arrow_error = {0};
    struct ArrowSchemaView view;
    std::memset(&view, 0, sizeof(view));
    for (int i = 0; i < binder_.schema.n_children; i++) {
      if (i > 0) {
        create_query.Append(", ");
        insert_query.Append(", ");
      }

      create_query.Append(R"("%w")", binder_.schema.children[i]->name);
      insert_query.Append(R"("%w")", binder_.schema.children[i]->name);

      int status = ArrowSchemaViewInit(&view, binder_.schema.children[i], &arrow_error);
      if (status != 0) {
        return status::fmt::Internal("failed to parse schema for column {}: {} ({}): {}",
                                     i, std::strerror(status), status,
                                     arrow_error.message);
      }

      switch (view.type) {
        case NANOARROW_TYPE_BOOL:
        case NANOARROW_TYPE_UINT8:
        case NANOARROW_TYPE_UINT16:
        case NANOARROW_TYPE_UINT32:
        case NANOARROW_TYPE_UINT64:
        case NANOARROW_TYPE_INT8:
        case NANOARROW_TYPE_INT16:
        case NANOARROW_TYPE_INT32:
        case NANOARROW_TYPE_INT64:
          create_query.Append(" INTEGER");
          break;
        case NANOARROW_TYPE_FLOAT:
        case NANOARROW_TYPE_DOUBLE:
          create_query.Append(" REAL");
          break;
        case NANOARROW_TYPE_STRING:
        case NANOARROW_TYPE_LARGE_STRING:
        case NANOARROW_TYPE_DATE32:
          create_query.Append(" TEXT");
          break;
        case NANOARROW_TYPE_BINARY:
          create_query.Append(" BLOB");
          break;
        default:
          break;
      }
    }

    create_query.Append(")");
    insert_query.Append(") VALUES (");
    for (int i = 0; i < binder_.schema.n_children; i++) {
      insert_query.Append("%s?", (i > 0 ? ", " : ""));
    }
    insert_query.Append(")");

    UNWRAP_RESULT(std::string_view create, create_query.GetString());
    UNWRAP_RESULT(std::string_view drop, drop_query.GetString());
    UNWRAP_RESULT(std::string_view insert, insert_query.GetString());

    // Drop/create tables as needed

    switch (state.table_exists_) {
      case Base::TableExists::kAppend:
      case Base::TableExists::kFail:
        // Do nothing
        break;
      case Base::TableExists::kReplace: {
        UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, drop));
        break;
      }
    }
    switch (state.table_does_not_exist_) {
      case Base::TableDoesNotExist::kCreate: {
        UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, create));
        break;
      }
      case Base::TableDoesNotExist::kFail:
        // Do nothing
        break;
    }

    // Insert
    int64_t row_count = 0;
    const int is_autocommit = sqlite3_get_autocommit(conn_);
    if (is_autocommit) {
      UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "BEGIN"));
    }

    assert(!insert.empty());
    sqlite3_stmt* stmt = nullptr;
    {
      int rc = sqlite3_prepare_v2(conn_, insert.data(), static_cast<int>(insert.size()),
                                  &stmt, /*pzTail=*/nullptr);
      if (rc != SQLITE_OK) {
        std::ignore = sqlite3_finalize(stmt);
        return status::fmt::Internal("failed to prepare: {}\nquery was: {}",
                                     sqlite3_errmsg(conn_), insert);
      }
    }
    assert(stmt != nullptr);

    AdbcStatusCode status_code = ADBC_STATUS_OK;
    Status status = status::Ok();
    struct AdbcError error = ADBC_ERROR_INIT;
    while (true) {
      char finished = 0;
      status_code = AdbcSqliteBinderBindNext(&binder_, conn_, stmt, &finished, &error);
      if (status_code != ADBC_STATUS_OK || finished) {
        status = Status::FromAdbc(status_code, error);
        break;
      }

      int rc = 0;
      do {
        rc = sqlite3_step(stmt);
      } while (rc == SQLITE_ROW);
      if (rc != SQLITE_DONE) {
        status = status::fmt::Internal("failed to execute: {}\nquery was: {}",
                                       sqlite3_errmsg(conn_), insert.data());
        status_code = ADBC_STATUS_INTERNAL;
        break;
      }
      row_count++;
    }
    std::ignore = sqlite3_finalize(stmt);

    if (is_autocommit) {
      if (status_code == ADBC_STATUS_OK) {
        UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "COMMIT"));
      } else {
        UNWRAP_STATUS(::adbc::sqlite::SqliteQuery::Execute(conn_, "ROLLBACK"));
      }
    }

    if (status_code != ADBC_STATUS_OK) {
      return status;
    }
    return row_count;
  }