src/nanoarrow/ipc/encoder.c (529 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <errno.h> #include <stdio.h> #include <string.h> #include "flatcc/flatcc_builder.h" #include "nanoarrow/ipc/flatcc_generated.h" #include "nanoarrow/nanoarrow.h" #include "nanoarrow/nanoarrow_ipc.h" #define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) #define FLATCC_RETURN_UNLESS_0_NO_NS(x, error) \ if ((x) != 0) { \ ArrowErrorSet(error, "%s:%d: %s failed", __FILE__, __LINE__, #x); \ return ENOMEM; \ } #define FLATCC_RETURN_UNLESS_0(x, error) FLATCC_RETURN_UNLESS_0_NO_NS(ns(x), error) #define FLATCC_RETURN_IF_NULL(x, error) \ if (!(x)) { \ ArrowErrorSet(error, "%s:%d: %s was null", __FILE__, __LINE__, #x); \ return ENOMEM; \ } struct ArrowIpcEncoderPrivate { flatcc_builder_t builder; struct ArrowBuffer buffers; struct ArrowBuffer nodes; int encoding_footer; }; ArrowErrorCode ArrowIpcEncoderInit(struct ArrowIpcEncoder* encoder) { NANOARROW_DCHECK(encoder != NULL); memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); encoder->private_data = ArrowMalloc(sizeof(struct ArrowIpcEncoderPrivate)); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; if (private == NULL) { return ENOMEM; } if (flatcc_builder_init(&private->builder) == -1) { ArrowFree(private); return ESPIPE; } private->encoding_footer = 0; ArrowBufferInit(&private->buffers); ArrowBufferInit(&private->nodes); return NANOARROW_OK; } void ArrowIpcEncoderReset(struct ArrowIpcEncoder* encoder) { NANOARROW_DCHECK(encoder != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; if (private != NULL) { flatcc_builder_clear(&private->builder); ArrowBufferReset(&private->nodes); ArrowBufferReset(&private->buffers); ArrowFree(private); } memset(encoder, 0, sizeof(struct ArrowIpcEncoder)); } static ArrowErrorCode ArrowIpcEncoderWriteContinuationAndSize(struct ArrowBuffer* out, size_t size) { _NANOARROW_CHECK_UPPER_LIMIT(size, INT32_MAX); NANOARROW_RETURN_NOT_OK(ArrowBufferAppendInt32(out, -1)); if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_BIG) { return ArrowBufferAppendInt32(out, (int32_t)bswap32((uint32_t)size)); } else { return ArrowBufferAppendInt32(out, (int32_t)size); } } ArrowErrorCode ArrowIpcEncoderFinalizeBuffer(struct ArrowIpcEncoder* encoder, char encapsulate, struct ArrowBuffer* out) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && out != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; size_t size = flatcc_builder_get_buffer_size(&private->builder); if (encapsulate) { int64_t padded_size = _ArrowRoundUpToMultipleOf8(size); NANOARROW_RETURN_NOT_OK( ArrowBufferReserve(out, sizeof(int32_t) + sizeof(int32_t) + padded_size)); NANOARROW_ASSERT_OK(ArrowIpcEncoderWriteContinuationAndSize(out, padded_size)); } else { NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(out, size)); } if (size == 0) { // Finalizing an empty flatcc_builder_t triggers an assertion return NANOARROW_OK; } void* data = flatcc_builder_copy_buffer(&private->builder, out->data + out->size_bytes, size); NANOARROW_DCHECK(data != NULL); NANOARROW_UNUSED(data); out->size_bytes += size; while (encapsulate && out->size_bytes % 8 != 0) { // zero padding bytes, if any out->data[out->size_bytes++] = 0; } // don't deallocate yet, just wipe the builder's current Message flatcc_builder_reset(&private->builder); return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncodeFieldType(flatcc_builder_t* builder, const struct ArrowSchemaView* schema_view, struct ArrowError* error) { switch (schema_view->type) { case NANOARROW_TYPE_NA: FLATCC_RETURN_UNLESS_0(Field_type_Null_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_BOOL: FLATCC_RETURN_UNLESS_0(Field_type_Bool_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_UINT8: case NANOARROW_TYPE_INT8: FLATCC_RETURN_UNLESS_0( Field_type_Int_create(builder, 8, schema_view->type == NANOARROW_TYPE_INT8), error); return NANOARROW_OK; case NANOARROW_TYPE_UINT16: case NANOARROW_TYPE_INT16: FLATCC_RETURN_UNLESS_0( Field_type_Int_create(builder, 16, schema_view->type == NANOARROW_TYPE_INT16), error); return NANOARROW_OK; case NANOARROW_TYPE_UINT32: case NANOARROW_TYPE_INT32: FLATCC_RETURN_UNLESS_0( Field_type_Int_create(builder, 32, schema_view->type == NANOARROW_TYPE_INT32), error); return NANOARROW_OK; case NANOARROW_TYPE_UINT64: case NANOARROW_TYPE_INT64: FLATCC_RETURN_UNLESS_0( Field_type_Int_create(builder, 64, schema_view->type == NANOARROW_TYPE_INT64), error); return NANOARROW_OK; case NANOARROW_TYPE_HALF_FLOAT: FLATCC_RETURN_UNLESS_0(Field_type_FloatingPoint_create(builder, ns(Precision_HALF)), error); return NANOARROW_OK; case NANOARROW_TYPE_FLOAT: FLATCC_RETURN_UNLESS_0( Field_type_FloatingPoint_create(builder, ns(Precision_SINGLE)), error); return NANOARROW_OK; case NANOARROW_TYPE_DOUBLE: FLATCC_RETURN_UNLESS_0( Field_type_FloatingPoint_create(builder, ns(Precision_DOUBLE)), error); return NANOARROW_OK; case NANOARROW_TYPE_DECIMAL32: case NANOARROW_TYPE_DECIMAL64: case NANOARROW_TYPE_DECIMAL128: case NANOARROW_TYPE_DECIMAL256: FLATCC_RETURN_UNLESS_0( Field_type_Decimal_create(builder, schema_view->decimal_precision, schema_view->decimal_scale, schema_view->decimal_bitwidth), error); return NANOARROW_OK; case NANOARROW_TYPE_STRING: FLATCC_RETURN_UNLESS_0(Field_type_Utf8_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_LARGE_STRING: FLATCC_RETURN_UNLESS_0(Field_type_LargeUtf8_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_BINARY: FLATCC_RETURN_UNLESS_0(Field_type_Binary_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_LARGE_BINARY: FLATCC_RETURN_UNLESS_0(Field_type_LargeBinary_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_DATE32: FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_DAY)), error); return NANOARROW_OK; case NANOARROW_TYPE_DATE64: FLATCC_RETURN_UNLESS_0(Field_type_Date_create(builder, ns(DateUnit_MILLISECOND)), error); return NANOARROW_OK; case NANOARROW_TYPE_INTERVAL_MONTHS: FLATCC_RETURN_UNLESS_0( Field_type_Interval_create(builder, ns(IntervalUnit_YEAR_MONTH)), error); return NANOARROW_OK; case NANOARROW_TYPE_INTERVAL_DAY_TIME: FLATCC_RETURN_UNLESS_0( Field_type_Interval_create(builder, ns(IntervalUnit_DAY_TIME)), error); return NANOARROW_OK; case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: FLATCC_RETURN_UNLESS_0( Field_type_Interval_create(builder, ns(IntervalUnit_MONTH_DAY_NANO)), error); return NANOARROW_OK; case NANOARROW_TYPE_TIMESTAMP: FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_start(builder), error); FLATCC_RETURN_UNLESS_0( Timestamp_unit_add(builder, (ns(TimeUnit_enum_t))schema_view->time_unit), error); if (schema_view->timezone && schema_view->timezone[0] != 0) { FLATCC_RETURN_UNLESS_0( Timestamp_timezone_create_str(builder, schema_view->timezone), error); } FLATCC_RETURN_UNLESS_0(Field_type_Timestamp_end(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_TIME32: FLATCC_RETURN_UNLESS_0( Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 32), error); return NANOARROW_OK; case NANOARROW_TYPE_TIME64: FLATCC_RETURN_UNLESS_0( Field_type_Time_create(builder, (ns(TimeUnit_enum_t))schema_view->time_unit, 64), error); return NANOARROW_OK; case NANOARROW_TYPE_DURATION: FLATCC_RETURN_UNLESS_0(Field_type_Duration_create( builder, (ns(TimeUnit_enum_t))schema_view->time_unit), error); return NANOARROW_OK; case NANOARROW_TYPE_FIXED_SIZE_BINARY: FLATCC_RETURN_UNLESS_0( Field_type_FixedSizeBinary_create(builder, schema_view->fixed_size), error); return NANOARROW_OK; case NANOARROW_TYPE_LIST: FLATCC_RETURN_UNLESS_0(Field_type_List_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_LARGE_LIST: FLATCC_RETURN_UNLESS_0(Field_type_LargeList_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_FIXED_SIZE_LIST: FLATCC_RETURN_UNLESS_0( Field_type_FixedSizeList_create(builder, schema_view->fixed_size), error); return NANOARROW_OK; case NANOARROW_TYPE_RUN_END_ENCODED: FLATCC_RETURN_UNLESS_0(Field_type_RunEndEncoded_create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_STRUCT: FLATCC_RETURN_UNLESS_0(Field_type_Struct__create(builder), error); return NANOARROW_OK; case NANOARROW_TYPE_SPARSE_UNION: case NANOARROW_TYPE_DENSE_UNION: { FLATCC_RETURN_UNLESS_0(Field_type_Union_start(builder), error); FLATCC_RETURN_UNLESS_0( Union_mode_add(builder, schema_view->type == NANOARROW_TYPE_DENSE_UNION), error); if (schema_view->union_type_ids) { int8_t type_ids[128]; int n = _ArrowParseUnionTypeIds(schema_view->union_type_ids, type_ids); if (n != 0) { FLATCC_RETURN_UNLESS_0(Union_typeIds_start(builder), error); int32_t* type_ids_32 = (int32_t*)ns(Union_typeIds_extend(builder, n)); FLATCC_RETURN_IF_NULL(type_ids_32, error); for (int i = 0; i < n; i++) { type_ids_32[i] = type_ids[i]; } FLATCC_RETURN_UNLESS_0(Union_typeIds_end(builder), error); } } FLATCC_RETURN_UNLESS_0(Field_type_Union_end(builder), error); return NANOARROW_OK; } case NANOARROW_TYPE_MAP: FLATCC_RETURN_UNLESS_0( Field_type_Map_create(builder, schema_view->schema->flags & ARROW_FLAG_MAP_KEYS_SORTED), error); return NANOARROW_OK; case NANOARROW_TYPE_DICTIONARY: ArrowErrorSet(error, "IPC encoding of dictionary types unsupported"); return ENOTSUP; default: ArrowErrorSet(error, "Expected a valid enum ArrowType value but found %d", schema_view->type); return EINVAL; } } static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, const struct ArrowSchema* schema, struct ArrowError* error); static ArrowErrorCode ArrowIpcEncodeMetadata(flatcc_builder_t* builder, const struct ArrowSchema* schema, int (*push_start)(flatcc_builder_t*), ns(KeyValue_ref_t) * (*push_end)(flatcc_builder_t*), struct ArrowError* error) { struct ArrowMetadataReader metadata; NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowMetadataReaderInit(&metadata, schema->metadata), error); while (metadata.remaining_keys > 0) { struct ArrowStringView key, value; NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowMetadataReaderRead(&metadata, &key, &value), error); FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error); FLATCC_RETURN_UNLESS_0(KeyValue_key_create_strn(builder, key.data, key.size_bytes), error); FLATCC_RETURN_UNLESS_0( KeyValue_value_create_strn(builder, value.data, value.size_bytes), error); FLATCC_RETURN_IF_NULL(push_end(builder), error); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncodeFields(flatcc_builder_t* builder, const struct ArrowSchema* schema, int (*push_start)(flatcc_builder_t*), ns(Field_ref_t) * (*push_end)(flatcc_builder_t*), struct ArrowError* error) { for (int i = 0; i < schema->n_children; i++) { FLATCC_RETURN_UNLESS_0_NO_NS(push_start(builder), error); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeField(builder, schema->children[i], error)); FLATCC_RETURN_IF_NULL(push_end(builder), error); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncodeField(flatcc_builder_t* builder, const struct ArrowSchema* schema, struct ArrowError* error) { FLATCC_RETURN_UNLESS_0(Field_name_create_str(builder, schema->name), error); FLATCC_RETURN_UNLESS_0( Field_nullable_add(builder, (schema->flags & ARROW_FLAG_NULLABLE) != 0), error); struct ArrowSchemaView schema_view; NANOARROW_RETURN_NOT_OK(ArrowSchemaViewInit(&schema_view, schema, error)); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFieldType(builder, &schema_view, error)); if (schema->n_children != 0) { FLATCC_RETURN_UNLESS_0(Field_children_start(builder), error); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, &ns(Field_children_push_start), &ns(Field_children_push_end), error)); FLATCC_RETURN_UNLESS_0(Field_children_end(builder), error); } if (schema->metadata) { FLATCC_RETURN_UNLESS_0(Field_custom_metadata_start(builder), error); NANOARROW_RETURN_NOT_OK( ArrowIpcEncodeMetadata(builder, schema, &ns(Field_custom_metadata_push_start), &ns(Field_custom_metadata_push_end), error)); FLATCC_RETURN_UNLESS_0(Field_custom_metadata_end(builder), error); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncodeSchema(flatcc_builder_t* builder, const struct ArrowSchema* schema, struct ArrowError* error) { NANOARROW_DCHECK(schema->release != NULL); if (strcmp(schema->format, "+s") != 0) { ArrowErrorSet( error, "Cannot encode schema with format '%s'; top level schema must have struct type", schema->format); return EINVAL; } if (ArrowIpcSystemEndianness() == NANOARROW_IPC_ENDIANNESS_LITTLE) { FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Little)), error); } else { FLATCC_RETURN_UNLESS_0(Schema_endianness_add(builder, ns(Endianness_Big)), error); } FLATCC_RETURN_UNLESS_0(Schema_fields_start(builder), error); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeFields(builder, schema, &ns(Schema_fields_push_start), &ns(Schema_fields_push_end), error)); FLATCC_RETURN_UNLESS_0(Schema_fields_end(builder), error); FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_start(builder), error); if (schema->metadata) { NANOARROW_RETURN_NOT_OK( ArrowIpcEncodeMetadata(builder, schema, &ns(Schema_custom_metadata_push_start), &ns(Schema_custom_metadata_push_end), error)); } FLATCC_RETURN_UNLESS_0(Schema_custom_metadata_end(builder), error); FLATCC_RETURN_UNLESS_0(Schema_features_start(builder), error); FLATCC_RETURN_UNLESS_0(Schema_features_end(builder), error); return NANOARROW_OK; } ArrowErrorCode ArrowIpcEncoderEncodeSchema(struct ArrowIpcEncoder* encoder, const struct ArrowSchema* schema, struct ArrowError* error) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && schema != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; flatcc_builder_t* builder = &private->builder; FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error); FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)), error); FLATCC_RETURN_UNLESS_0(Message_header_Schema_start(builder), error); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeSchema(builder, schema, error)); FLATCC_RETURN_UNLESS_0(Message_header_Schema_end(builder), error); FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, 0), error); FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error); return NANOARROW_OK; } struct ArrowIpcBufferEncoder { /// \brief Callback invoked against each buffer to be encoded /// /// Encoding of buffers is left as a callback to accommodate dissociated data storage. /// One implementation of this callback might copy all buffers into a contiguous body /// for use in an arrow IPC stream, another implementation might store offsets and /// lengths relative to a known arena. ArrowErrorCode (*encode_buffer)(struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder, int64_t* offset, int64_t* length, struct ArrowError* error); /// \brief Pointer to arbitrary data used by encode_buffer() void* encode_buffer_state; /// \brief Finalized body length of the most recently encoded RecordBatch message /// /// encode_buffer() is expected to update this while encoding each buffer. After all /// buffers are encoded, this will be written to the RecordBatch's .bodyLength int64_t body_length; }; static ArrowErrorCode ArrowIpcEncoderBuildContiguousBodyBufferCallback( struct ArrowBufferView buffer_view, struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder, int64_t* offset, int64_t* length, struct ArrowError* error) { NANOARROW_UNUSED(encoder); struct ArrowBuffer* body_buffer = (struct ArrowBuffer*)buffer_encoder->encode_buffer_state; int64_t old_size = body_buffer->size_bytes; int64_t buffer_begin = _ArrowRoundUpToMultipleOf8(old_size); int64_t buffer_end = buffer_begin + buffer_view.size_bytes; int64_t new_size = _ArrowRoundUpToMultipleOf8(buffer_end); // reserve all the memory we'll need now NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferReserve(body_buffer, new_size - old_size), error); // zero padding up to the start of the buffer NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, buffer_begin - old_size)); // store offset and length of the buffer *offset = buffer_begin; *length = buffer_view.size_bytes; NANOARROW_ASSERT_OK( ArrowBufferAppend(body_buffer, buffer_view.data.data, buffer_view.size_bytes)); // zero padding after writing the buffer NANOARROW_DCHECK(body_buffer->size_bytes == buffer_end); NANOARROW_ASSERT_OK(ArrowBufferAppendFill(body_buffer, 0, new_size - buffer_end)); buffer_encoder->body_length = body_buffer->size_bytes; return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatchImpl( struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder, const struct ArrowArrayView* array_view, struct ArrowBuffer* buffers, struct ArrowBuffer* nodes, struct ArrowError* error) { if (array_view->offset != 0) { ArrowErrorSet(error, "Cannot encode arrays with nonzero offset"); return ENOTSUP; } for (int64_t c = 0; c < array_view->n_children; ++c) { const struct ArrowArrayView* child = array_view->children[c]; struct ns(FieldNode) node = {child->length, child->null_count}; NANOARROW_RETURN_NOT_OK_WITH_ERROR(ArrowBufferAppend(nodes, &node, sizeof(node)), error); for (int64_t b = 0; b < child->array->n_buffers; ++b) { struct ns(Buffer) buffer; NANOARROW_RETURN_NOT_OK( buffer_encoder->encode_buffer(child->buffer_views[b], encoder, buffer_encoder, &buffer.offset, &buffer.length, error)); NANOARROW_RETURN_NOT_OK_WITH_ERROR( ArrowBufferAppend(buffers, &buffer, sizeof(buffer)), error); } NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl( encoder, buffer_encoder, child, buffers, nodes, error)); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcEncoderEncodeRecordBatch( struct ArrowIpcEncoder* encoder, struct ArrowIpcBufferEncoder* buffer_encoder, const struct ArrowArrayView* array_view, struct ArrowError* error) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && buffer_encoder != NULL && buffer_encoder->encode_buffer != NULL); if (array_view->null_count != 0 && ArrowArrayViewComputeNullCount(array_view) != 0) { ArrowErrorSet(error, "RecordBatches cannot be constructed from arrays with top level nulls"); return EINVAL; } if (array_view->storage_type != NANOARROW_TYPE_STRUCT) { ArrowErrorSet( error, "RecordBatches cannot be constructed from arrays of type other than struct"); return EINVAL; } struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; flatcc_builder_t* builder = &private->builder; FLATCC_RETURN_UNLESS_0(Message_start_as_root(builder), error); FLATCC_RETURN_UNLESS_0(Message_version_add(builder, ns(MetadataVersion_V5)), error); FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_start(builder), error); FLATCC_RETURN_UNLESS_0(RecordBatch_length_add(builder, array_view->length), error); NANOARROW_ASSERT_OK(ArrowBufferResize(&private->buffers, 0, 0)); NANOARROW_ASSERT_OK(ArrowBufferResize(&private->nodes, 0, 0)); NANOARROW_RETURN_NOT_OK(ArrowIpcEncoderEncodeRecordBatchImpl( encoder, buffer_encoder, array_view, &private->buffers, &private->nodes, error)); FLATCC_RETURN_UNLESS_0(RecordBatch_nodes_create( // builder, (struct ns(FieldNode)*)private->nodes.data, private->nodes.size_bytes / sizeof(struct ns(FieldNode))), error); FLATCC_RETURN_UNLESS_0(RecordBatch_buffers_create( // builder, (struct ns(Buffer)*)private->buffers.data, private->buffers.size_bytes / sizeof(struct ns(Buffer))), error); FLATCC_RETURN_UNLESS_0(Message_header_RecordBatch_end(builder), error); FLATCC_RETURN_UNLESS_0(Message_bodyLength_add(builder, buffer_encoder->body_length), error); FLATCC_RETURN_IF_NULL(ns(Message_end_as_root(builder)), error); return NANOARROW_OK; } ArrowErrorCode ArrowIpcEncoderEncodeSimpleRecordBatch( struct ArrowIpcEncoder* encoder, const struct ArrowArrayView* array_view, struct ArrowBuffer* body_buffer, struct ArrowError* error) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && body_buffer != NULL); struct ArrowIpcBufferEncoder buffer_encoder = { .encode_buffer = &ArrowIpcEncoderBuildContiguousBodyBufferCallback, .encode_buffer_state = body_buffer, .body_length = 0, }; return ArrowIpcEncoderEncodeRecordBatch(encoder, &buffer_encoder, array_view, error); } void ArrowIpcFooterInit(struct ArrowIpcFooter* footer) { footer->schema.release = NULL; ArrowBufferInit(&footer->record_batch_blocks); } void ArrowIpcFooterReset(struct ArrowIpcFooter* footer) { if (footer->schema.release != NULL) { ArrowSchemaRelease(&footer->schema); } ArrowBufferReset(&footer->record_batch_blocks); } ArrowErrorCode ArrowIpcEncoderEncodeFooter(struct ArrowIpcEncoder* encoder, const struct ArrowIpcFooter* footer, struct ArrowError* error) { NANOARROW_DCHECK(encoder != NULL && encoder->private_data != NULL && footer != NULL); struct ArrowIpcEncoderPrivate* private = (struct ArrowIpcEncoderPrivate*)encoder->private_data; flatcc_builder_t* builder = &private->builder; FLATCC_RETURN_UNLESS_0(Footer_start_as_root(builder), error); FLATCC_RETURN_UNLESS_0(Footer_version_add(builder, ns(MetadataVersion_V5)), error); FLATCC_RETURN_UNLESS_0(Footer_schema_start(builder), error); NANOARROW_RETURN_NOT_OK(ArrowIpcEncodeSchema(builder, &footer->schema, error)); FLATCC_RETURN_UNLESS_0(Footer_schema_end(builder), error); const struct ArrowIpcFileBlock* blocks = (struct ArrowIpcFileBlock*)footer->record_batch_blocks.data; int64_t n_blocks = footer->record_batch_blocks.size_bytes / sizeof(struct ArrowIpcFileBlock); FLATCC_RETURN_UNLESS_0(Footer_recordBatches_start(builder), error); struct ns(Block)* flatcc_RecordBatch_blocks = ns(Footer_recordBatches_extend(builder, n_blocks)); FLATCC_RETURN_IF_NULL(flatcc_RecordBatch_blocks, error); for (int64_t i = 0; i < n_blocks; i++) { struct ns(Block) block = { blocks[i].offset, blocks[i].metadata_length, blocks[i].body_length, }; flatcc_RecordBatch_blocks[i] = block; } FLATCC_RETURN_UNLESS_0(Footer_recordBatches_end(builder), error); FLATCC_RETURN_IF_NULL(ns(Footer_end_as_root(builder)), error); return NANOARROW_OK; }