python/src/nanoarrow/_ipc_lib.pyx (236 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: language_level = 3 # cython: linetrace=True from libc.stdint cimport uint8_t, int64_t, uintptr_t from libc.errno cimport EIO, EAGAIN from libc.stdio cimport snprintf from cpython.ref cimport PyObject, Py_INCREF, Py_DECREF from cpython cimport Py_buffer, PyBuffer_FillInfo from nanoarrow_c cimport ( ArrowArrayStream, ArrowArrayView, ArrowSchema, ArrowErrorCode, ArrowError, NANOARROW_OK, ) from nanoarrow._schema cimport CSchema from nanoarrow._array cimport CArrayView from nanoarrow._utils cimport Error cdef extern from "nanoarrow/nanoarrow_ipc.h" nogil: struct ArrowIpcInputStream: ArrowErrorCode (*read)(ArrowIpcInputStream* stream, uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, ArrowError* error) void (*release)(ArrowIpcInputStream* stream) void* private_data struct ArrowIpcArrayStreamReaderOptions: int64_t field_index int use_shared_buffers ArrowErrorCode ArrowIpcArrayStreamReaderInit( ArrowArrayStream* out, ArrowIpcInputStream* input_stream, ArrowIpcArrayStreamReaderOptions* options) struct ArrowIpcOutputStream: ArrowErrorCode (*write)(ArrowIpcOutputStream* stream, const void* buf, int64_t buf_size_bytes, int64_t* size_written_out, ArrowError* error) void (*release)(ArrowIpcOutputStream* stream) void* private_data struct ArrowIpcWriter: void* private_data ArrowErrorCode ArrowIpcWriterInit(ArrowIpcWriter* writer, ArrowIpcOutputStream* output_stream) void ArrowIpcWriterReset(ArrowIpcWriter* writer) ArrowErrorCode ArrowIpcWriterWriteSchema(ArrowIpcWriter* writer, const ArrowSchema* in_, ArrowError* error) ArrowErrorCode ArrowIpcWriterWriteArrayView(ArrowIpcWriter* writer, const ArrowArrayView* in_, ArrowError* error) ArrowErrorCode ArrowIpcWriterWriteArrayStream(ArrowIpcWriter* writer, ArrowArrayStream* in_, ArrowError* error) cdef class PyStreamPrivate: cdef object _obj cdef bint _close_obj cdef void* _addr cdef Py_ssize_t _size_bytes cdef bint _buffer_readonly def __cinit__(self, obj, bint buffer_readonly, bint close_obj=False): self._obj = obj self._close_obj = close_obj self._addr = NULL self._size_bytes = 0 self._buffer_readonly = buffer_readonly @property def obj(self): return self._obj @property def close_obj(self): return self._close_obj def set_buffer(self, uintptr_t addr, Py_ssize_t size_bytes): self._addr = <void*>addr self._size_bytes = size_bytes # Needed for at least some implementations of readinto() def __len__(self): return self._size_bytes # Implement the buffer protocol so that this object can be used as # the argument to xxx.readinto() or xxx.write(). This ensures that # no extra copies (beyond any buffering done by the upstream file-like object) # are held since the upstream object has access to the preallocated output buffer. # In the read case, the preallocation is done by the ArrowArrayStream # implementation before issuing each read call (two per message, with # an extra call for a RecordBatch message to get the actual buffer data). # In the write case, this will be a view of whatever information was provided to # the write callback. def __getbuffer__(self, Py_buffer* buffer, int flags): PyBuffer_FillInfo(buffer, self, self._addr, self._size_bytes, self._buffer_readonly, flags) def __releasebuffer__(self, Py_buffer* buffer): pass cdef ArrowErrorCode py_input_stream_read(ArrowIpcInputStream* stream, uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, ArrowError* error) noexcept nogil: with gil: stream_private = <object>stream.private_data stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes) try: # Non-blocking streams may return None here, or buffered # wrappers of them may raise BufferedIOError read_result = stream_private.obj.readinto(stream_private) if read_result is None: size_read_out[0] = 0 return EAGAIN else: size_read_out[0] = read_result return NANOARROW_OK except Exception as e: cls = type(e).__name__.encode() msg = str(e).encode() snprintf( error.message, sizeof(error.message), "%s: %s", <const char*>cls, <const char*>msg ) return EIO cdef void py_input_stream_release(ArrowIpcInputStream* stream) noexcept nogil: with gil: stream_private = <object>stream.private_data if stream_private.close_obj: stream_private.obj.close() Py_DECREF(stream_private) stream.private_data = NULL stream.release = NULL cdef ArrowErrorCode py_output_stream_write(ArrowIpcOutputStream* stream, const void* buf, int64_t buf_size_bytes, int64_t* size_written_out, ArrowError* error) noexcept nogil: with gil: stream_private = <object>stream.private_data stream_private.set_buffer(<uintptr_t>buf, buf_size_bytes) try: # Non-blocking streams may return None here, or buffered # wrappers of them may raise BufferedIOError write_result = stream_private.obj.write(stream_private) # Non-blocking streams may return None here if write_result is None: size_written_out[0] = 0 return EAGAIN else: size_written_out[0] = write_result return NANOARROW_OK except Exception as e: cls = type(e).__name__.encode() msg = str(e).encode() snprintf( error.message, sizeof(error.message), "%s: %s", <const char*>cls, <const char*>msg ) return EIO cdef void py_output_stream_release(ArrowIpcOutputStream* stream) noexcept nogil: with gil: stream_private = <object>stream.private_data if stream_private.close_obj: stream_private.obj.close() Py_DECREF(stream_private) stream.private_data = NULL stream.release = NULL cdef class CIpcInputStream: cdef ArrowIpcInputStream _stream def __cinit__(self): self._stream.release = NULL def is_valid(self): return self._stream.release != NULL def __dealloc__(self): # Duplicating release() to avoid Python API calls in the deallocator if self._stream.release != NULL: self._stream.release(&self._stream) def release(self): if self._stream.release != NULL: self._stream.release(&self._stream) return True else: return False @staticmethod def from_readable(obj, close_obj=False): cdef CIpcInputStream stream = CIpcInputStream() cdef PyStreamPrivate private_data = PyStreamPrivate( obj, buffer_readonly=False, close_obj=close_obj ) stream._stream.private_data = <PyObject*>private_data Py_INCREF(private_data) stream._stream.read = &py_input_stream_read stream._stream.release = &py_input_stream_release return stream def init_array_stream(CIpcInputStream input_stream, uintptr_t out): cdef ArrowArrayStream* out_ptr = <ArrowArrayStream*>out # There are some options here that could be exposed at some point cdef int code = ArrowIpcArrayStreamReaderInit(out_ptr, &input_stream._stream, NULL) if code != NANOARROW_OK: raise RuntimeError(f"ArrowIpcArrayStreamReaderInit() failed with code [{code}]") cdef class CIpcOutputStream: cdef ArrowIpcOutputStream _stream def __cinit__(self): self._stream.release = NULL def is_valid(self): return self._stream.release != NULL def __dealloc__(self): # Duplicating release() to avoid Python API calls in the deallocator if self._stream.release != NULL: self._stream.release(&self._stream) def release(self): if self._stream.release != NULL: self._stream.release(&self._stream) return True else: return False @staticmethod def from_writable(obj, close_obj=False): cdef CIpcOutputStream stream = CIpcOutputStream() cdef PyStreamPrivate private_data = PyStreamPrivate( obj, buffer_readonly=True, close_obj=close_obj ) stream._stream.private_data = <PyObject*>private_data Py_INCREF(private_data) stream._stream.write = &py_output_stream_write stream._stream.release = &py_output_stream_release return stream cdef class CIpcWriter: cdef ArrowIpcWriter _writer def __cinit__(self, CIpcOutputStream stream): self._writer.private_data = NULL if not stream.is_valid(): raise ValueError("Can't create writer from released stream") cdef int code = ArrowIpcWriterInit(&self._writer, &stream._stream) Error.raise_error_not_ok("ArrowIpcWriterInit()", code) def is_valid(self): return self._writer.private_data != NULL def __dealloc__(self): if self._writer.private_data != NULL: ArrowIpcWriterReset(&self._writer) def release(self): if self._writer.private_data != NULL: ArrowIpcWriterReset(&self._writer) def write_schema(self, CSchema schema): cdef Error error = Error() cdef int code = ArrowIpcWriterWriteSchema(&self._writer, schema._ptr, &error.c_error) error.raise_message_not_ok("ArrowIpcWriterWriteSchema()", code) def write_array_view(self, CArrayView array_view): cdef Error error = Error() cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, array_view._ptr, &error.c_error) error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code) def write_array_stream(self, uintptr_t stream_addr): cdef ArrowArrayStream* array_stream = <ArrowArrayStream*>stream_addr cdef Error error = Error() cdef int code = ArrowIpcWriterWriteArrayStream(&self._writer, array_stream, &error.c_error) error.raise_message_not_ok("ArrowIpcWriterWriteArrayStream()", code) def write_end_of_stream(self): cdef Error error = Error() cdef int code = ArrowIpcWriterWriteArrayView(&self._writer, NULL, &error.c_error) error.raise_message_not_ok("ArrowIpcWriterWriteArrayView()", code)