src/nanoarrow/ipc/writer.c (315 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include "flatcc/flatcc_builder.h"
#include "nanoarrow/ipc/flatcc_generated.h"
#include "nanoarrow/nanoarrow.h"
#include "nanoarrow/nanoarrow_ipc.h"
#define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x)
void ArrowIpcOutputStreamMove(struct ArrowIpcOutputStream* src,
struct ArrowIpcOutputStream* dst) {
NANOARROW_DCHECK(src != NULL && dst != NULL);
memcpy(dst, src, sizeof(struct ArrowIpcOutputStream));
src->release = NULL;
}
ArrowErrorCode ArrowIpcOutputStreamWrite(struct ArrowIpcOutputStream* stream,
struct ArrowBufferView data,
struct ArrowError* error) {
while (data.size_bytes != 0) {
int64_t bytes_written = 0;
NANOARROW_RETURN_NOT_OK(stream->write(stream, data.data.as_uint8, data.size_bytes,
&bytes_written, error));
data.size_bytes -= bytes_written;
data.data.as_uint8 += bytes_written;
}
return NANOARROW_OK;
}
struct ArrowIpcOutputStreamBufferPrivate {
struct ArrowBuffer* output;
};
static ArrowErrorCode ArrowIpcOutputStreamBufferWrite(struct ArrowIpcOutputStream* stream,
const void* buf,
int64_t buf_size_bytes,
int64_t* size_written_out,
struct ArrowError* error) {
struct ArrowIpcOutputStreamBufferPrivate* private_data =
(struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferAppend(private_data->output, buf, buf_size_bytes), error);
*size_written_out = buf_size_bytes;
return NANOARROW_OK;
}
static void ArrowIpcOutputStreamBufferRelease(struct ArrowIpcOutputStream* stream) {
struct ArrowIpcOutputStreamBufferPrivate* private_data =
(struct ArrowIpcOutputStreamBufferPrivate*)stream->private_data;
ArrowFree(private_data);
stream->release = NULL;
}
ArrowErrorCode ArrowIpcOutputStreamInitBuffer(struct ArrowIpcOutputStream* stream,
struct ArrowBuffer* output) {
NANOARROW_DCHECK(stream != NULL && output != NULL);
struct ArrowIpcOutputStreamBufferPrivate* private_data =
(struct ArrowIpcOutputStreamBufferPrivate*)ArrowMalloc(
sizeof(struct ArrowIpcOutputStreamBufferPrivate));
if (private_data == NULL) {
return ENOMEM;
}
private_data->output = output;
stream->write = &ArrowIpcOutputStreamBufferWrite;
stream->release = &ArrowIpcOutputStreamBufferRelease;
stream->private_data = private_data;
return NANOARROW_OK;
}
struct ArrowIpcOutputStreamFilePrivate {
FILE* file_ptr;
int stream_finished;
int close_on_release;
};
static void ArrowIpcOutputStreamFileRelease(struct ArrowIpcOutputStream* stream) {
struct ArrowIpcOutputStreamFilePrivate* private_data =
(struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
if (private_data->file_ptr != NULL && private_data->close_on_release) {
fclose(private_data->file_ptr);
}
ArrowFree(private_data);
stream->release = NULL;
}
static ArrowErrorCode ArrowIpcOutputStreamFileWrite(struct ArrowIpcOutputStream* stream,
const void* buf,
int64_t buf_size_bytes,
int64_t* size_written_out,
struct ArrowError* error) {
struct ArrowIpcOutputStreamFilePrivate* private_data =
(struct ArrowIpcOutputStreamFilePrivate*)stream->private_data;
if (private_data->stream_finished) {
*size_written_out = 0;
return NANOARROW_OK;
}
// Do the write
int64_t bytes_written = (int64_t)fwrite(buf, 1, buf_size_bytes, private_data->file_ptr);
*size_written_out = bytes_written;
if (bytes_written != buf_size_bytes) {
private_data->stream_finished = 1;
// Inspect error
int has_error = !feof(private_data->file_ptr) && ferror(private_data->file_ptr);
// Try to close the file now
if (private_data->close_on_release) {
if (fclose(private_data->file_ptr) == 0) {
private_data->file_ptr = NULL;
}
}
// Maybe return error
if (has_error) {
ArrowErrorSet(error, "ArrowIpcOutputStreamFile IO error");
return EIO;
}
}
return NANOARROW_OK;
}
ArrowErrorCode ArrowIpcOutputStreamInitFile(struct ArrowIpcOutputStream* stream,
void* file_ptr, int close_on_release) {
NANOARROW_DCHECK(stream != NULL);
if (file_ptr == NULL) {
return errno ? errno : EINVAL;
}
struct ArrowIpcOutputStreamFilePrivate* private_data =
(struct ArrowIpcOutputStreamFilePrivate*)ArrowMalloc(
sizeof(struct ArrowIpcOutputStreamFilePrivate));
if (private_data == NULL) {
return ENOMEM;
}
private_data->file_ptr = (FILE*)file_ptr;
private_data->close_on_release = close_on_release;
private_data->stream_finished = 0;
stream->write = &ArrowIpcOutputStreamFileWrite;
stream->release = &ArrowIpcOutputStreamFileRelease;
stream->private_data = private_data;
return NANOARROW_OK;
}
struct ArrowIpcWriterPrivate {
struct ArrowIpcEncoder encoder;
struct ArrowIpcOutputStream output_stream;
struct ArrowBuffer buffer;
struct ArrowBuffer body_buffer;
int writing_file;
int64_t bytes_written;
struct ArrowIpcFooter footer;
};
ArrowErrorCode ArrowIpcWriterInit(struct ArrowIpcWriter* writer,
struct ArrowIpcOutputStream* output_stream) {
NANOARROW_DCHECK(writer != NULL && output_stream != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)ArrowMalloc(sizeof(struct ArrowIpcWriterPrivate));
if (private == NULL) {
return ENOMEM;
}
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderInit(&private->encoder));
ArrowIpcOutputStreamMove(output_stream, &private->output_stream);
ArrowBufferInit(&private->buffer);
ArrowBufferInit(&private->body_buffer);
private->writing_file = 0;
private->bytes_written = 0;
ArrowIpcFooterInit(&private->footer);
writer->private_data = private;
return NANOARROW_OK;
}
void ArrowIpcWriterReset(struct ArrowIpcWriter* writer) {
NANOARROW_DCHECK(writer != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)writer->private_data;
if (private != NULL) {
ArrowIpcEncoderReset(&private->encoder);
private->output_stream.release(&private->output_stream);
ArrowBufferReset(&private->buffer);
ArrowBufferReset(&private->body_buffer);
ArrowIpcFooterReset(&private->footer);
ArrowFree(private);
}
memset(writer, 0, sizeof(struct ArrowIpcWriter));
}
static struct ArrowBufferView ArrowBufferToBufferView(const struct ArrowBuffer* buffer) {
struct ArrowBufferView buffer_view = {
.data.as_uint8 = buffer->data,
.size_bytes = buffer->size_bytes,
};
return buffer_view;
}
// Eventually, it may be necessary to construct an ArrowIpcWriter which doesn't rely on
// blocking writes (ArrowIpcOutputStreamWrite). For example an ArrowIpcOutputStream
// might wrap a socket which is not always able to transmit all bytes of a Message. In
// that case users of ArrowIpcWriter might prefer to do other work until a socket is
// ready rather than blocking, or timeout, or otherwise respond to partial transmission.
//
// This could be handled by:
// - keeping partially sent buffers internal and signalling incomplete transmission by
// raising EAGAIN, returning "bytes actually written", ...
// - when the caller is ready to try again, call ArrowIpcWriterWriteSome()
// - exposing internal buffers which have not been completely sent, deferring
// follow-up transmission to the caller
ArrowErrorCode ArrowIpcWriterWriteSchema(struct ArrowIpcWriter* writer,
const struct ArrowSchema* in,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)writer->private_data;
NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSchema(&private->encoder, in, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
&private->buffer),
error);
if (private->writing_file) {
NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowSchemaDeepCopy(in, &private->footer.schema),
error);
}
private->bytes_written += private->buffer.size_bytes;
return ArrowIpcOutputStreamWrite(&private->output_stream,
ArrowBufferToBufferView(&private->buffer), error);
}
ArrowErrorCode ArrowIpcWriterWriteArrayView(struct ArrowIpcWriter* writer,
const struct ArrowArrayView* in,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)writer->private_data;
if (in == NULL) {
int32_t eos[] = {-1, 0};
private->bytes_written += sizeof(eos);
struct ArrowBufferView eos_view = {.data.as_int32 = eos, .size_bytes = sizeof(eos)};
return ArrowIpcOutputStreamWrite(&private->output_stream, eos_view, error);
}
NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
NANOARROW_ASSERT_OK(ArrowBufferResize(&private->body_buffer, 0, 0));
NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeSimpleRecordBatch(
&private->encoder, in, &private->body_buffer, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/1,
&private->buffer),
error);
if (private->writing_file) {
_NANOARROW_CHECK_RANGE(private->buffer.size_bytes, 0, INT32_MAX);
struct ArrowIpcFileBlock block = {
.offset = private->bytes_written,
.metadata_length = (int32_t) private->buffer.size_bytes,
.body_length = private->body_buffer.size_bytes,
};
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferAppend(&private->footer.record_batch_blocks, &block, sizeof(block)),
error);
}
private->bytes_written += private->buffer.size_bytes;
private->bytes_written += private->body_buffer.size_bytes;
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
&private->output_stream, ArrowBufferToBufferView(&private->buffer), error));
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
&private->output_stream, ArrowBufferToBufferView(&private->body_buffer), error));
return NANOARROW_OK;
}
static ArrowErrorCode ArrowIpcWriterWriteArrayStreamImpl(
struct ArrowIpcWriter* writer, struct ArrowArrayStream* in,
struct ArrowSchema* schema, struct ArrowArray* array,
struct ArrowArrayView* array_view, struct ArrowError* error) {
NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetSchema(in, schema, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteSchema(writer, schema, error));
NANOARROW_RETURN_NOT_OK(ArrowArrayViewInitFromSchema(array_view, schema, error));
while (1) {
NANOARROW_RETURN_NOT_OK(ArrowArrayStreamGetNext(in, array, error));
if (array->release == NULL) {
break;
}
NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(array_view, array, error));
NANOARROW_RETURN_NOT_OK(ArrowIpcWriterWriteArrayView(writer, array_view, error));
ArrowArrayRelease(array);
}
// The stream is complete, signal the end to the caller
return ArrowIpcWriterWriteArrayView(writer, NULL, error);
}
ArrowErrorCode ArrowIpcWriterWriteArrayStream(struct ArrowIpcWriter* writer,
struct ArrowArrayStream* in,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL && in != NULL);
struct ArrowSchema schema = {.release = NULL};
struct ArrowArray array = {.release = NULL};
struct ArrowArrayView array_view;
ArrowArrayViewInitFromType(&array_view, NANOARROW_TYPE_UNINITIALIZED);
ArrowErrorCode result =
ArrowIpcWriterWriteArrayStreamImpl(writer, in, &schema, &array, &array_view, error);
if (schema.release != NULL) {
ArrowSchemaRelease(&schema);
}
if (array.release != NULL) {
ArrowArrayRelease(&array);
}
ArrowArrayViewReset(&array_view);
return result;
}
#define NANOARROW_IPC_FILE_PADDED_MAGIC "ARROW1\0"
ArrowErrorCode ArrowIpcWriterStartFile(struct ArrowIpcWriter* writer,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)writer->private_data;
NANOARROW_DCHECK(!private->writing_file && private->bytes_written == 0);
struct ArrowBufferView magic = {
.data.data = NANOARROW_IPC_FILE_PADDED_MAGIC,
.size_bytes = sizeof(NANOARROW_IPC_FILE_PADDED_MAGIC),
};
NANOARROW_RETURN_NOT_OK(
ArrowIpcOutputStreamWrite(&private->output_stream, magic, error));
private->writing_file = 1;
private->bytes_written = magic.size_bytes;
return NANOARROW_OK;
}
ArrowErrorCode ArrowIpcWriterFinalizeFile(struct ArrowIpcWriter* writer,
struct ArrowError* error) {
NANOARROW_DCHECK(writer != NULL && writer->private_data != NULL);
struct ArrowIpcWriterPrivate* private =
(struct ArrowIpcWriterPrivate*)writer->private_data;
NANOARROW_DCHECK(private->writing_file);
NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffer, 0, 0));
NANOARROW_RETURN_NOT_OK(
ArrowIpcEncoderEncodeFooter(&private->encoder, &private->footer, error));
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowIpcEncoderFinalizeBuffer(&private->encoder, /*encapsulate=*/0,
&private->buffer),
error);
_NANOARROW_CHECK_RANGE(private->buffer.size_bytes, 0, INT32_MAX);
int32_t size = (int32_t) private->buffer.size_bytes;
// we don't pad the magic at the end of the file
struct ArrowStringView unpadded_magic = ArrowCharView(NANOARROW_IPC_FILE_PADDED_MAGIC);
NANOARROW_DCHECK(unpadded_magic.size_bytes == 6);
// just append to private->buffer instead of queueing two more tiny writes
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowBufferReserve(&private->buffer, sizeof(size) + unpadded_magic.size_bytes),
error);
if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) {
size = (int32_t)bswap32((uint32_t)size);
}
NANOARROW_ASSERT_OK(ArrowBufferAppendInt32(&private->buffer, size));
NANOARROW_ASSERT_OK(ArrowBufferAppendStringView(&private->buffer, unpadded_magic));
NANOARROW_RETURN_NOT_OK(ArrowIpcOutputStreamWrite(
&private->output_stream, ArrowBufferToBufferView(&private->buffer), error));
private->bytes_written += private->buffer.size_bytes;
return NANOARROW_OK;
}