python/pyfury/format/serializer.py (62 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pyarrow as pa
from pyfury.serializer import CrossLanguageCompatibleSerializer, BufferObject
from pyfury.buffer import Buffer
class ArrowRecordBatchSerializer(CrossLanguageCompatibleSerializer):
def write(self, buffer, value: pa.RecordBatch):
self.fury.write_buffer_object(buffer, ArrowRecordBatchBufferObject(value))
def read(self, buffer: Buffer) -> pa.Table:
fury_buf = self.fury.read_buffer_object(buffer)
# If the input source supports zero-copy reads (e.g. like a memory
# map, or pa.BufferReader), then the returned batches are also
# zero-copy and do not allocate any new memory on read.
# So here the read is zero copy.
reader = pa.ipc.open_stream(pa.py_buffer(fury_buf))
[batch] = [batch for batch in reader]
return batch
class ArrowRecordBatchBufferObject(BufferObject):
def __init__(self, batch: pa.RecordBatch):
self.batch = batch
mock_sink = pa.MockOutputStream()
ArrowRecordBatchBufferObject._write(batch, mock_sink)
self.nbytes = mock_sink.size()
def total_bytes(self) -> int:
return self.nbytes
def write_to(self, buffer: Buffer):
assert isinstance(buffer, Buffer)
sink = pa.FixedSizeBufferWriter(pa.py_buffer(buffer))
self._write(self.batch, sink)
def to_buffer(self) -> Buffer:
sink = pa.BufferOutputStream()
ArrowRecordBatchBufferObject._write(self.batch, sink)
return Buffer(sink.getvalue())
@staticmethod
def _write(batch, sink):
stream_writer = pa.RecordBatchStreamWriter(sink, batch.schema)
stream_writer.write_batch(batch)
stream_writer.close()
class ArrowTableSerializer(CrossLanguageCompatibleSerializer):
def write(self, buffer, value: pa.Table):
self.fury.write_buffer_object(buffer, ArrowTableBufferObject(value))
def read(self, buffer: Buffer) -> pa.Table:
fury_buf = self.fury.read_buffer_object(buffer)
# If the input source supports zero-copy reads (e.g. like a memory
# map, or pa.BufferReader), then the returned batches are also
# zero-copy and do not allocate any new memory on read.
# So here the read is zero copy.
reader = pa.ipc.open_stream(pa.py_buffer(fury_buf))
batches = [batch for batch in reader]
return pa.Table.from_batches(batches)
class ArrowTableBufferObject(BufferObject):
def __init__(self, table: pa.Table):
self.table = table
mock_sink = pa.MockOutputStream()
ArrowTableBufferObject._write(table, mock_sink)
self.nbytes = mock_sink.size()
def total_bytes(self) -> int:
return self.nbytes
def write_to(self, buffer: Buffer):
assert isinstance(buffer, Buffer)
sink = pa.FixedSizeBufferWriter(pa.py_buffer(buffer))
ArrowTableBufferObject._write(self.table, sink)
def to_buffer(self) -> Buffer:
sink = pa.BufferOutputStream()
self._write(self.table, sink)
return Buffer(sink.getvalue())
@staticmethod
def _write(table, sink):
stream_writer = pa.RecordBatchStreamWriter(sink, table.schema)
for batch in table.to_batches():
stream_writer.write_batch(batch)
stream_writer.close()