SEXP nanoarrow_c_convert_array_stream()

in r/src/convert_array_stream.c [29:97]


SEXP nanoarrow_c_convert_array_stream(SEXP array_stream_xptr, SEXP ptype_sexp,
                                      SEXP size_sexp, SEXP n_sexp) {
  struct ArrowArrayStream* array_stream = array_stream_from_xptr(array_stream_xptr);
  double size = REAL(size_sexp)[0];
  double n = REAL(n_sexp)[0];

  SEXP schema_xptr = PROTECT(schema_owning_xptr());
  struct ArrowSchema* schema = (struct ArrowSchema*)R_ExternalPtrAddr(schema_xptr);
  int result = array_stream->get_schema(array_stream, schema);
  if (result != NANOARROW_OK) {
    Rf_error("ArrowArrayStream::get_schema(): %s",
             array_stream->get_last_error(array_stream));
  }

  SEXP converter_xptr = PROTECT(nanoarrow_converter_from_ptype(ptype_sexp));
  if (nanoarrow_converter_set_schema(converter_xptr, schema_xptr) != NANOARROW_OK) {
    nanoarrow_converter_stop(converter_xptr);
  }

  if (nanoarrow_converter_reserve(converter_xptr, size) != NANOARROW_OK) {
    nanoarrow_converter_stop(converter_xptr);
  }

  SEXP array_xptr = PROTECT(array_owning_xptr());
  struct ArrowArray* array = (struct ArrowArray*)R_ExternalPtrAddr(array_xptr);

  int64_t n_batches = 0;
  int64_t n_materialized = 0;
  if (n > 0) {
    result = array_stream->get_next(array_stream, array);
    n_batches++;
    if (result != NANOARROW_OK) {
      Rf_error("ArrowArrayStream::get_next(): %s",
               array_stream->get_last_error(array_stream));
    }

    while (array->release != NULL) {
      if (nanoarrow_converter_set_array(converter_xptr, array_xptr) != NANOARROW_OK) {
        nanoarrow_converter_stop(converter_xptr);
      }

      n_materialized = nanoarrow_converter_materialize_n(converter_xptr, array->length);
      if (n_materialized != array->length) {
        Rf_error("Expected to materialize %ld values in batch %ld but materialized %ld",
                 (long)array->length, (long)n_batches, (long)n_materialized);
      }

      if (n_batches >= n) {
        break;
      }

      array->release(array);
      result = array_stream->get_next(array_stream, array);
      n_batches++;
      if (result != NANOARROW_OK) {
        Rf_error("ArrowArrayStream::get_next(): %s",
                 array_stream->get_last_error(array_stream));
      }
    }
  }

  if (nanoarrow_converter_finalize(converter_xptr) != NANOARROW_OK) {
    nanoarrow_converter_stop(converter_xptr);
  }

  SEXP result_sexp = PROTECT(nanoarrow_converter_release_result(converter_xptr));
  UNPROTECT(4);
  return result_sexp;
}