AdbcStatusCode AdbcSqliteExportReader()

in c/driver/sqlite/statement_reader.c [1023:1122]


AdbcStatusCode AdbcSqliteExportReader(sqlite3* db, sqlite3_stmt* stmt,
                                      struct AdbcSqliteBinder* binder, size_t batch_size,
                                      struct ArrowArrayStream* stream,
                                      struct AdbcError* error) {
  struct StatementReader* reader = malloc(sizeof(struct StatementReader));
  memset(reader, 0, sizeof(struct StatementReader));
  reader->db = db;
  reader->stmt = stmt;
  reader->batch_size = batch_size;

  stream->private_data = reader;
  stream->release = StatementReaderRelease;
  stream->get_last_error = StatementReaderGetLastError;
  stream->get_next = StatementReaderGetNext;
  stream->get_schema = StatementReaderGetSchema;

  sqlite3_mutex_enter(sqlite3_db_mutex(db));

  const int num_columns = sqlite3_column_count(stmt);
  struct ArrowBitmap* validity = malloc(num_columns * sizeof(struct ArrowBitmap));
  struct ArrowBuffer* data = malloc(num_columns * sizeof(struct ArrowBuffer));
  struct ArrowBuffer* binary = malloc(num_columns * sizeof(struct ArrowBuffer));
  enum ArrowType* current_type = malloc(num_columns * sizeof(enum ArrowType));

  AdbcStatusCode status = StatementReaderInitializeInfer(
      num_columns, batch_size, validity, data, binary, current_type, error);

  if (binder) {
    char finished = 0;
    status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
    if (finished) {
      reader->done = 1;
    }
  }

  if (status == ADBC_STATUS_OK && !reader->done) {
    int64_t num_rows = 0;
    while (((size_t)num_rows) < batch_size) {
      int rc = sqlite3_step(stmt);
      if (rc == SQLITE_DONE) {
        if (!binder) {
          reader->done = 1;
          break;
        } else {
          char finished = 0;
          status = AdbcSqliteBinderBindNext(binder, db, stmt, &finished, error);
          if (status != ADBC_STATUS_OK) break;
          if (finished) {
            reader->done = 1;
            break;
          }
        }
        continue;
      } else if (rc == SQLITE_ERROR) {
        SetError(error, "Failed to step query: %s", sqlite3_errmsg(db));
        status = ADBC_STATUS_IO;
        // Reset here so that we don't get an error again in StatementRelease
        (void)sqlite3_reset(stmt);
        break;
      } else if (rc != SQLITE_ROW) {
        status = ADBC_STATUS_INTERNAL;
        break;
      }

      for (int col = 0; col < num_columns; col++) {
        status = StatementReaderInferOneValue(stmt, col, &validity[col], &data[col],
                                              &binary[col], &current_type[col], error);
        if (status != ADBC_STATUS_OK) break;
      }
      if (status != ADBC_STATUS_OK) break;
      num_rows++;
    }

    if (status == ADBC_STATUS_OK) {
      status = StatementReaderInferFinalize(stmt, num_columns, num_rows, reader, validity,
                                            data, binary, current_type, error);
    }
  }

  if (status != ADBC_STATUS_OK) {
    // Free the individual buffers
    // This is OK, since InferFinalize either moves all buffers or no buffers
    for (int i = 0; i < num_columns; i++) {
      ArrowBitmapReset(&validity[i]);
      ArrowBufferReset(&data[i]);
      ArrowBufferReset(&binary[i]);
    }
    free(current_type);
  } else {
    reader->types = current_type;
    reader->binder = binder;
  }

  free(data);
  free(validity);
  free(binary);

  sqlite3_mutex_leave(sqlite3_db_mutex(db));
  return status;
}  // NOLINT(whitespace/indent)