r/src/array_stream.c (159 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #define R_NO_REMAP #include <R.h> #include <Rinternals.h> #include "array.h" #include "array_stream.h" #include "nanoarrow.h" #include "schema.h" #include "util.h" // Ideally user-supplied finalizers are written in such a way that they don't jump; // however if they do it is likely that memory will leak. Here, we use // R_tryCatchError to minimize the chances of that happening. static SEXP run_finalizer_wrapper(void* data) { SEXP finalizer_sym = PROTECT(Rf_install("array_stream_finalizer")); SEXP finalizer_call = PROTECT(Rf_lang1(finalizer_sym)); Rf_eval(finalizer_call, (SEXP)data); UNPROTECT(2); return R_NilValue; } static SEXP run_finalizer_error_handler(SEXP cond, void* hdata) { REprintf("Error evaluating user-supplied array stream finalizer"); return R_NilValue; } void run_user_array_stream_finalizer(SEXP array_stream_xptr) { SEXP protected = PROTECT(R_ExternalPtrProtected(array_stream_xptr)); R_SetExternalPtrProtected(array_stream_xptr, R_NilValue); if (Rf_inherits(protected, "nanoarrow_array_stream_finalizer")) { R_tryCatchError(&run_finalizer_wrapper, protected, &run_finalizer_error_handler, NULL); } UNPROTECT(1); } void finalize_array_stream_xptr(SEXP array_stream_xptr) { struct ArrowArrayStream* array_stream = (struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr); if (array_stream != NULL && array_stream->release != NULL) { array_stream->release(array_stream); } if (array_stream != NULL) { ArrowFree(array_stream); } run_user_array_stream_finalizer(array_stream_xptr); } SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr) { struct ArrowArrayStream* array_stream = array_stream_from_xptr(array_stream_xptr); SEXP schema_xptr = PROTECT(schema_owning_xptr()); struct ArrowSchema* schema = (struct ArrowSchema*)R_ExternalPtrAddr(schema_xptr); int result = array_stream->get_schema(array_stream, schema); if (result != 0) { const char* last_error = array_stream->get_last_error(array_stream); if (last_error == NULL) { last_error = ""; } Rf_error("array_stream->get_schema(): [%d] %s", result, last_error); } UNPROTECT(1); return schema_xptr; } SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr) { struct ArrowArrayStream* array_stream = array_stream_from_xptr(array_stream_xptr); SEXP array_xptr = PROTECT(array_owning_xptr()); struct ArrowArray* array = (struct ArrowArray*)R_ExternalPtrAddr(array_xptr); int result = array_stream->get_next(array_stream, array); if (result != 0) { const char* last_error = array_stream->get_last_error(array_stream); if (last_error == NULL) { last_error = ""; } Rf_error("array_stream->get_next(): [%d] %s", result, last_error); } UNPROTECT(1); return array_xptr; } SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr, SEXP validate_sexp) { int validate = LOGICAL(validate_sexp)[0]; // Schema needs a copy here because ArrowBasicArrayStreamInit() takes ownership SEXP schema_copy_xptr = PROTECT(schema_owning_xptr()); struct ArrowSchema* schema_copy = (struct ArrowSchema*)R_ExternalPtrAddr(schema_copy_xptr); schema_export(schema_xptr, schema_copy); SEXP array_stream_xptr = PROTECT(array_stream_owning_xptr()); struct ArrowArrayStream* array_stream = (struct ArrowArrayStream*)R_ExternalPtrAddr(array_stream_xptr); int64_t n_arrays = Rf_xlength(batches_sexp); if (ArrowBasicArrayStreamInit(array_stream, schema_copy, n_arrays) != NANOARROW_OK) { Rf_error("Failed to initialize array stream"); } struct ArrowArray array; for (int64_t i = 0; i < n_arrays; i++) { array_export(VECTOR_ELT(batches_sexp, i), &array); ArrowBasicArrayStreamSetArray(array_stream, i, &array); } if (validate) { struct ArrowError error; if (ArrowBasicArrayStreamValidate(array_stream, &error) != NANOARROW_OK) { Rf_error("ArrowBasicArrayStreamValidate(): %s", ArrowErrorMessage(&error)); } } UNPROTECT(2); return array_stream_xptr; } // Implementation of an ArrowArrayStream that keeps a dependent object valid struct WrapperArrayStreamData { SEXP parent_array_stream_xptr; struct ArrowArrayStream* parent_array_stream; }; static void finalize_wrapper_array_stream(struct ArrowArrayStream* array_stream) { if (array_stream->private_data != NULL) { struct WrapperArrayStreamData* data = (struct WrapperArrayStreamData*)array_stream->private_data; // If safe to do so, attempt to do an eager evaluation of a release // callback that may have been registered. If it is not safe to do so, // garbage collection will run any finalizers that have been set // on the chain of environments leading up to the finalizer. if (nanoarrow_is_main_thread()) { run_user_array_stream_finalizer(data->parent_array_stream_xptr); } nanoarrow_release_sexp(data->parent_array_stream_xptr); ArrowFree(array_stream->private_data); } array_stream->release = NULL; } static const char* wrapper_array_stream_get_last_error( struct ArrowArrayStream* array_stream) { struct WrapperArrayStreamData* data = (struct WrapperArrayStreamData*)array_stream->private_data; return data->parent_array_stream->get_last_error(data->parent_array_stream); } static int wrapper_array_stream_get_schema(struct ArrowArrayStream* array_stream, struct ArrowSchema* out) { struct WrapperArrayStreamData* data = (struct WrapperArrayStreamData*)array_stream->private_data; return data->parent_array_stream->get_schema(data->parent_array_stream, out); } static int wrapper_array_stream_get_next(struct ArrowArrayStream* array_stream, struct ArrowArray* out) { struct WrapperArrayStreamData* data = (struct WrapperArrayStreamData*)array_stream->private_data; return data->parent_array_stream->get_next(data->parent_array_stream, out); } void array_stream_export(SEXP parent_array_stream_xptr, struct ArrowArrayStream* array_stream_copy) { struct ArrowArrayStream* parent_array_stream = array_stream_from_xptr(parent_array_stream_xptr); // If there is no dependent object, don't bother with this wrapper SEXP dependent_sexp = R_ExternalPtrProtected(parent_array_stream_xptr); if (dependent_sexp == R_NilValue) { ArrowArrayStreamMove(parent_array_stream, array_stream_copy); return; } // Allocate a new external pointer for an array stream (for consistency: // we always move an array stream when exporting) SEXP parent_array_stream_xptr_new = PROTECT(array_stream_owning_xptr()); struct ArrowArrayStream* parent_array_stream_new = (struct ArrowArrayStream*)R_ExternalPtrAddr(parent_array_stream_xptr_new); ArrowArrayStreamMove(parent_array_stream, parent_array_stream_new); R_SetExternalPtrProtected(parent_array_stream_xptr_new, dependent_sexp); array_stream_copy->private_data = NULL; array_stream_copy->get_last_error = &wrapper_array_stream_get_last_error; array_stream_copy->get_schema = &wrapper_array_stream_get_schema; array_stream_copy->get_next = &wrapper_array_stream_get_next; array_stream_copy->release = &finalize_wrapper_array_stream; struct WrapperArrayStreamData* data = (struct WrapperArrayStreamData*)ArrowMalloc(sizeof(struct WrapperArrayStreamData)); check_trivial_alloc(data, "struct WrapperArrayStreamData"); data->parent_array_stream_xptr = parent_array_stream_xptr_new; data->parent_array_stream = parent_array_stream_new; array_stream_copy->private_data = data; // Transfer responsibility for the stream_xptr to the C object nanoarrow_preserve_sexp(parent_array_stream_xptr_new); UNPROTECT(1); }