AdbcStatusCode SqliteStatementInitIngest()

in c/driver/sqlite/sqlite.c [978:1128]


AdbcStatusCode SqliteStatementInitIngest(struct SqliteStatement* stmt,
                                         sqlite3_stmt** insert_statement,
                                         struct AdbcError* error) {
  AdbcStatusCode code = ADBC_STATUS_OK;

  // Create statements for CREATE TABLE / INSERT
  sqlite3_str* create_query = sqlite3_str_new(NULL);
  if (sqlite3_str_errcode(create_query)) {
    SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
    return ADBC_STATUS_INTERNAL;
  }
  struct StringBuilder insert_query = {0};

  if (StringBuilderInit(&insert_query, /*initial_size=*/256) != 0) {
    SetError(error, "[SQLite] Could not initiate StringBuilder");
    sqlite3_free(sqlite3_str_finish(create_query));
    return ADBC_STATUS_INTERNAL;
  }

  sqlite3_str_appendf(create_query, "%s%Q%s", "CREATE TABLE ", stmt->target_table, " (");
  if (sqlite3_str_errcode(create_query)) {
    SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
    code = ADBC_STATUS_INTERNAL;
    goto cleanup;
  }

  if (StringBuilderAppend(&insert_query, "%s%s%s", "INSERT INTO ", stmt->target_table,
                          " VALUES (") != 0) {
    SetError(error, "[SQLite] Call to StringBuilderAppend failed");
    code = ADBC_STATUS_INTERNAL;
    goto cleanup;
  }

  struct ArrowError arrow_error = {0};
  struct ArrowSchemaView view = {0};
  for (int i = 0; i < stmt->binder.schema.n_children; i++) {
    if (i > 0) {
      sqlite3_str_appendf(create_query, "%s", ", ");
      if (sqlite3_str_errcode(create_query)) {
        SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
        code = ADBC_STATUS_INTERNAL;
        goto cleanup;
      }
    }

    sqlite3_str_appendf(create_query, "%Q", stmt->binder.schema.children[i]->name);
    if (sqlite3_str_errcode(create_query)) {
      SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
      code = ADBC_STATUS_INTERNAL;
      goto cleanup;
    }

    int status =
        ArrowSchemaViewInit(&view, stmt->binder.schema.children[i], &arrow_error);
    if (status != 0) {
      SetError(error, "Failed to parse schema for column %d: %s (%d): %s", i,
               strerror(status), status, arrow_error.message);
      code = ADBC_STATUS_INTERNAL;
      goto cleanup;
    }

    switch (view.type) {
      case NANOARROW_TYPE_UINT8:
      case NANOARROW_TYPE_UINT16:
      case NANOARROW_TYPE_UINT32:
      case NANOARROW_TYPE_UINT64:
      case NANOARROW_TYPE_INT8:
      case NANOARROW_TYPE_INT16:
      case NANOARROW_TYPE_INT32:
      case NANOARROW_TYPE_INT64:
        sqlite3_str_appendf(create_query, " INTEGER");
        break;
      case NANOARROW_TYPE_FLOAT:
      case NANOARROW_TYPE_DOUBLE:
        sqlite3_str_appendf(create_query, " REAL");
        break;
      case NANOARROW_TYPE_STRING:
      case NANOARROW_TYPE_LARGE_STRING:
      case NANOARROW_TYPE_DATE32:
        sqlite3_str_appendf(create_query, " TEXT");
        break;
      case NANOARROW_TYPE_BINARY:
        sqlite3_str_appendf(create_query, " BLOB");
        break;
      default:
        break;
    }

    if (i > 0) {
      if (StringBuilderAppend(&insert_query, "%s", ", ") != 0) {
        SetError(error, "[SQLite] Call to StringBuilderAppend failed");
        code = ADBC_STATUS_INTERNAL;
        goto cleanup;
      }
    }

    if (StringBuilderAppend(&insert_query, "%s", "?") != 0) {
      SetError(error, "[SQLite] Call to StringBuilderAppend failed");
      code = ADBC_STATUS_INTERNAL;
      goto cleanup;
    }
  }

  sqlite3_str_appendchar(create_query, 1, ')');
  if (sqlite3_str_errcode(create_query)) {
    SetError(error, "[SQLite] %s", sqlite3_errmsg(stmt->conn));
    code = ADBC_STATUS_INTERNAL;
    goto cleanup;
  }

  if (StringBuilderAppend(&insert_query, "%s", ")") != 0) {
    SetError(error, "[SQLite] Call to StringBuilderAppend failed");
    code = ADBC_STATUS_INTERNAL;
    goto cleanup;
  }

  sqlite3_stmt* create = NULL;
  if (!stmt->append) {
    // Create table
    int rc =
        sqlite3_prepare_v2(stmt->conn, sqlite3_str_value(create_query),
                           sqlite3_str_length(create_query), &create, /*pzTail=*/NULL);
    if (rc == SQLITE_OK) {
      rc = sqlite3_step(create);
    }

    if (rc != SQLITE_OK && rc != SQLITE_DONE) {
      SetError(error, "[SQLite] Failed to create table: %s (executed '%.*s')",
               sqlite3_errmsg(stmt->conn), sqlite3_str_length(create_query),
               sqlite3_str_value(create_query));
      code = ADBC_STATUS_INTERNAL;
    }
  }

  if (code == ADBC_STATUS_OK) {
    int rc = sqlite3_prepare_v2(stmt->conn, insert_query.buffer, (int)insert_query.size,
                                insert_statement, /*pzTail=*/NULL);
    if (rc != SQLITE_OK) {
      SetError(error, "[SQLite] Failed to prepare statement: %s (executed '%s')",
               sqlite3_errmsg(stmt->conn), insert_query.buffer);
      code = ADBC_STATUS_INTERNAL;
    }
  }

  sqlite3_finalize(create);

cleanup:
  sqlite3_free(sqlite3_str_finish(create_query));
  StringBuilderReset(&insert_query);
  return code;
}