extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c (1,335 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include <errno.h> #include <stdio.h> #include <string.h> // For thread safe shared buffers we need C11 + stdatomic.h // Can compile with -DNANOARROW_IPC_USE_STDATOMIC=0 or 1 to override // automatic detection #if !defined(NANOARROW_IPC_USE_STDATOMIC) #define NANOARROW_IPC_USE_STDATOMIC 0 // Check for C11 #if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L // Check for GCC 4.8, which doesn't include stdatomic.h but does // not define __STDC_NO_ATOMICS__ #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ >= 5 #if !defined(__STDC_NO_ATOMICS__) #include <stdatomic.h> #undef NANOARROW_IPC_USE_STDATOMIC #define NANOARROW_IPC_USE_STDATOMIC 1 #endif #endif #endif #endif #include "nanoarrow.h" #include "nanoarrow_ipc.h" #include "nanoarrow_ipc_flatcc_generated.h" // Internal representation of a parsed "Field" from flatbuffers. This // represents a field in a depth-first walk of column arrays and their // children. struct ArrowIpcField { // Pointer to the ArrowIpcDecoderPrivate::array_view or child for this node struct ArrowArrayView* array_view; // Pointer to the ArrowIpcDecoderPrivate::array or child for this node. This // array is scratch space for any intermediary allocations (i.e., it is never moved // to the user). struct ArrowArray* array; // The cumulative number of buffers preceeding this node. int64_t buffer_offset; }; // Internal data specific to the read/decode process struct ArrowIpcDecoderPrivate { // The endianness that will be assumed for decoding future RecordBatch messages enum ArrowIpcEndianness endianness; // A cached system endianness value enum ArrowIpcEndianness system_endianness; // An ArrowArrayView whose length/null_count/buffers are set directly from the // deserialized flatbuffer message (i.e., no fully underlying ArrowArray exists, // although some buffers may be temporarily owned by ArrowIpcDecoderPrivate::array). struct ArrowArrayView array_view; // An ArrowArray with the same structure as the ArrowArrayView whose ArrowArrayBuffer() // values are used to allocate or store memory when this is required. This ArrowArray // is never moved to the caller; however, its buffers may be moved to the final output // ArrowArray if the caller requests one. struct ArrowArray array; // The number of fields in the flattened depth-first walk of columns and their children int64_t n_fields; // Array of cached information such that given a field index it is possible to locate // the ArrowArrayView/ArrowArray where the depth-first buffer/field walk should start. struct ArrowIpcField* fields; // The number of buffers that future RecordBatch messages must have to match the schema // that has been set. int64_t n_buffers; // A pointer to the last flatbuffers message. const void* last_message; }; ArrowErrorCode ArrowIpcCheckRuntime(struct ArrowError* error) { const char* nanoarrow_runtime_version = ArrowNanoarrowVersion(); const char* nanoarrow_ipc_build_time_version = NANOARROW_VERSION; if (strcmp(nanoarrow_runtime_version, nanoarrow_ipc_build_time_version) != 0) { ArrowErrorSet(error, "Expected nanoarrow runtime version '%s' but found version '%s'", nanoarrow_ipc_build_time_version, nanoarrow_runtime_version); return EINVAL; } return NANOARROW_OK; } static enum ArrowIpcEndianness ArrowIpcSystemEndianness(void) { uint32_t check = 1; char first_byte; memcpy(&first_byte, &check, sizeof(char)); if (first_byte) { return NANOARROW_IPC_ENDIANNESS_LITTLE; } else { return NANOARROW_IPC_ENDIANNESS_BIG; } } #if NANOARROW_IPC_USE_STDATOMIC struct ArrowIpcSharedBufferPrivate { struct ArrowBuffer src; atomic_long reference_count; }; static int64_t ArrowIpcSharedBufferUpdate( struct ArrowIpcSharedBufferPrivate* private_data, int delta) { int64_t old_count = atomic_fetch_add(&private_data->reference_count, delta); return old_count + delta; } static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, int64_t count) { atomic_store(&private_data->reference_count, count); } int ArrowIpcSharedBufferIsThreadSafe(void) { return 1; } #else struct ArrowIpcSharedBufferPrivate { struct ArrowBuffer src; int64_t reference_count; }; static int64_t ArrowIpcSharedBufferUpdate( struct ArrowIpcSharedBufferPrivate* private_data, int delta) { private_data->reference_count += delta; return private_data->reference_count; } static void ArrowIpcSharedBufferSet(struct ArrowIpcSharedBufferPrivate* private_data, int64_t count) { private_data->reference_count = count; } int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; } #endif static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) { struct ArrowIpcSharedBufferPrivate* private_data = (struct ArrowIpcSharedBufferPrivate*)allocator->private_data; if (ArrowIpcSharedBufferUpdate(private_data, -1) == 0) { ArrowBufferReset(&private_data->src); ArrowFree(private_data); } } ArrowErrorCode ArrowIpcSharedBufferInit(struct ArrowIpcSharedBuffer* shared, struct ArrowBuffer* src) { if (src->data == NULL) { ArrowBufferMove(src, &shared->private_src); return NANOARROW_OK; } struct ArrowIpcSharedBufferPrivate* private_data = (struct ArrowIpcSharedBufferPrivate*)ArrowMalloc( sizeof(struct ArrowIpcSharedBufferPrivate)); if (private_data == NULL) { return ENOMEM; } ArrowBufferMove(src, &private_data->src); ArrowIpcSharedBufferSet(private_data, 1); ArrowBufferInit(&shared->private_src); shared->private_src.data = private_data->src.data; shared->private_src.size_bytes = private_data->src.size_bytes; // Don't expose any extra capcity from src so that any calls to ArrowBufferAppend // on this buffer will fail. shared->private_src.capacity_bytes = private_data->src.size_bytes; shared->private_src.allocator = ArrowBufferDeallocator(&ArrowIpcSharedBufferFree, private_data); return NANOARROW_OK; } static void ArrowIpcSharedBufferClone(struct ArrowIpcSharedBuffer* shared, struct ArrowBuffer* shared_out) { if (shared->private_src.data == NULL) { ArrowBufferInit(shared_out); shared_out->size_bytes = shared_out->size_bytes; shared_out->capacity_bytes = shared_out->capacity_bytes; return; } struct ArrowIpcSharedBufferPrivate* private_data = (struct ArrowIpcSharedBufferPrivate*)shared->private_src.allocator.private_data; ArrowIpcSharedBufferUpdate(private_data, 1); memcpy(shared_out, shared, sizeof(struct ArrowBuffer)); } void ArrowIpcSharedBufferReset(struct ArrowIpcSharedBuffer* shared) { ArrowBufferReset(&shared->private_src); } static int ArrowIpcDecoderNeedsSwapEndian(struct ArrowIpcDecoder* decoder) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; switch (private_data->endianness) { case NANOARROW_IPC_ENDIANNESS_LITTLE: case NANOARROW_IPC_ENDIANNESS_BIG: return private_data->endianness != NANOARROW_IPC_ENDIANNESS_UNINITIALIZED && private_data->endianness != private_data->system_endianness; default: return 0; } } ArrowErrorCode ArrowIpcDecoderInit(struct ArrowIpcDecoder* decoder) { memset(decoder, 0, sizeof(struct ArrowIpcDecoder)); struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)ArrowMalloc(sizeof(struct ArrowIpcDecoderPrivate)); if (private_data == NULL) { return ENOMEM; } memset(private_data, 0, sizeof(struct ArrowIpcDecoderPrivate)); private_data->system_endianness = ArrowIpcSystemEndianness(); decoder->private_data = private_data; return NANOARROW_OK; } void ArrowIpcDecoderReset(struct ArrowIpcDecoder* decoder) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (private_data != NULL) { ArrowArrayViewReset(&private_data->array_view); if (private_data->array.release != NULL) { private_data->array.release(&private_data->array); } if (private_data->fields != NULL) { ArrowFree(private_data->fields); private_data->n_fields = 0; } ArrowFree(private_data); memset(decoder, 0, sizeof(struct ArrowIpcDecoder)); } } static inline uint32_t ArrowIpcReadContinuationBytes(struct ArrowBufferView* data) { uint32_t value; memcpy(&value, data->data.as_uint8, sizeof(uint32_t)); data->data.as_uint8 += sizeof(uint32_t); data->size_bytes -= sizeof(uint32_t); return value; } static inline int32_t ArrowIpcReadInt32LE(struct ArrowBufferView* data, int swap_endian) { int32_t value; memcpy(&value, data->data.as_uint8, sizeof(int32_t)); if (swap_endian) { value = bswap32(value); } data->data.as_uint8 += sizeof(int32_t); data->size_bytes -= sizeof(int32_t); return value; } #define ns(x) FLATBUFFERS_WRAP_NAMESPACE(org_apache_arrow_flatbuf, x) static int ArrowIpcDecoderSetMetadata(struct ArrowSchema* schema, ns(KeyValue_vec_t) kv_vec, struct ArrowError* error) { int64_t n_pairs = ns(KeyValue_vec_len(kv_vec)); if (n_pairs == 0) { return NANOARROW_OK; } if (n_pairs > 2147483647) { ArrowErrorSet(error, "Expected between 0 and 2147483647 key/value pairs but found %ld", (long)n_pairs); return EINVAL; } struct ArrowBuffer buf; struct ArrowStringView key; struct ArrowStringView value; ns(KeyValue_table_t) kv; int result = ArrowMetadataBuilderInit(&buf, NULL); if (result != NANOARROW_OK) { ArrowBufferReset(&buf); ArrowErrorSet(error, "ArrowMetadataBuilderInit() failed"); return result; } for (int64_t i = 0; i < n_pairs; i++) { kv = ns(KeyValue_vec_at(kv_vec, i)); key.data = ns(KeyValue_key(kv)); key.size_bytes = strlen(key.data); value.data = ns(KeyValue_value(kv)); value.size_bytes = strlen(value.data); result = ArrowMetadataBuilderAppend(&buf, key, value); if (result != NANOARROW_OK) { ArrowBufferReset(&buf); ArrowErrorSet(error, "ArrowMetadataBuilderAppend() failed"); return result; } } result = ArrowSchemaSetMetadata(schema, (const char*)buf.data); ArrowBufferReset(&buf); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetMetadata() failed"); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeSimple(struct ArrowSchema* schema, int nanoarrow_type, struct ArrowError* error) { int result = ArrowSchemaSetType(schema, nanoarrow_type); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetType() failed for type %s", ArrowTypeString(nanoarrow_type)); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeInt(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Int_table_t) type = (ns(Int_table_t))type_generic; int is_signed = ns(Int_is_signed_get(type)); int bitwidth = ns(Int_bitWidth_get(type)); int nanoarrow_type = NANOARROW_TYPE_UNINITIALIZED; if (is_signed) { switch (bitwidth) { case 8: nanoarrow_type = NANOARROW_TYPE_INT8; break; case 16: nanoarrow_type = NANOARROW_TYPE_INT16; break; case 32: nanoarrow_type = NANOARROW_TYPE_INT32; break; case 64: nanoarrow_type = NANOARROW_TYPE_INT64; break; default: ArrowErrorSet(error, "Expected signed int bitwidth of 8, 16, 32, or 64 but got %d", (int)bitwidth); return EINVAL; } } else { switch (bitwidth) { case 8: nanoarrow_type = NANOARROW_TYPE_UINT8; break; case 16: nanoarrow_type = NANOARROW_TYPE_UINT16; break; case 32: nanoarrow_type = NANOARROW_TYPE_UINT32; break; case 64: nanoarrow_type = NANOARROW_TYPE_UINT64; break; default: ArrowErrorSet(error, "Expected unsigned int bitwidth of 8, 16, 32, or 64 but got %d", (int)bitwidth); return EINVAL; } } return ArrowIpcDecoderSetTypeSimple(schema, nanoarrow_type, error); } static int ArrowIpcDecoderSetTypeFloatingPoint(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(FloatingPoint_table_t) type = (ns(FloatingPoint_table_t))type_generic; int precision = ns(FloatingPoint_precision(type)); switch (precision) { case ns(Precision_HALF): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_HALF_FLOAT, error); case ns(Precision_SINGLE): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_FLOAT, error); case ns(Precision_DOUBLE): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DOUBLE, error); default: ArrowErrorSet(error, "Unexpected FloatingPoint Precision value: %d", (int)precision); return EINVAL; } } static int ArrowIpcDecoderSetTypeDecimal(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Decimal_table_t) type = (ns(Decimal_table_t))type_generic; int scale = ns(Decimal_scale(type)); int precision = ns(Decimal_precision(type)); int bitwidth = ns(Decimal_bitWidth(type)); int result; switch (bitwidth) { case 128: result = ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL128, precision, scale); break; case 256: result = ArrowSchemaSetTypeDecimal(schema, NANOARROW_TYPE_DECIMAL256, precision, scale); break; default: ArrowErrorSet(error, "Unexpected Decimal bitwidth value: %d", (int)bitwidth); return EINVAL; } if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetTypeDecimal() failed"); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic; int fixed_size = ns(FixedSizeBinary_byteWidth(type)); return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_size); } static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Date_table_t) type = (ns(Date_table_t))type_generic; int date_unit = ns(Date_unit(type)); switch (date_unit) { case ns(DateUnit_DAY): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE32, error); case ns(DateUnit_MILLISECOND): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_DATE64, error); default: ArrowErrorSet(error, "Unexpected Date DateUnit value: %d", (int)date_unit); return EINVAL; } } static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Time_table_t) type = (ns(Time_table_t))type_generic; int time_unit = ns(Time_unit(type)); int bitwidth = ns(Time_bitWidth(type)); int nanoarrow_type; switch (time_unit) { case ns(TimeUnit_SECOND): case ns(TimeUnit_MILLISECOND): if (bitwidth != 32) { ArrowErrorSet(error, "Expected bitwidth of 32 for Time TimeUnit %s but found %d", ns(TimeUnit_name(time_unit)), bitwidth); return EINVAL; } nanoarrow_type = NANOARROW_TYPE_TIME32; break; case ns(TimeUnit_MICROSECOND): case ns(TimeUnit_NANOSECOND): if (bitwidth != 64) { ArrowErrorSet(error, "Expected bitwidth of 64 for Time TimeUnit %s but found %d", ns(TimeUnit_name(time_unit)), bitwidth); return EINVAL; } nanoarrow_type = NANOARROW_TYPE_TIME64; break; default: ArrowErrorSet(error, "Unexpected Time TimeUnit value: %d", (int)time_unit); return EINVAL; } int result = ArrowSchemaSetTypeDateTime(schema, nanoarrow_type, time_unit, NULL); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetTypeDateTime() failed"); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeTimestamp(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Timestamp_table_t) type = (ns(Timestamp_table_t))type_generic; int time_unit = ns(Timestamp_unit(type)); const char* timezone = ""; if (ns(Timestamp_timezone_is_present(type))) { timezone = ns(Timestamp_timezone_get(type)); } int result = ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_TIMESTAMP, time_unit, timezone); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetTypeDateTime() failed"); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeDuration(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Duration_table_t) type = (ns(Duration_table_t))type_generic; int time_unit = ns(Duration_unit(type)); int result = ArrowSchemaSetTypeDateTime(schema, NANOARROW_TYPE_DURATION, time_unit, NULL); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetTypeDateTime() failed"); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeInterval(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Interval_table_t) type = (ns(Interval_table_t))type_generic; int interval_unit = ns(Interval_unit(type)); switch (interval_unit) { case ns(IntervalUnit_YEAR_MONTH): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_INTERVAL_MONTHS, error); case ns(IntervalUnit_DAY_TIME): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_INTERVAL_DAY_TIME, error); case ns(IntervalUnit_MONTH_DAY_NANO): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO, error); default: ArrowErrorSet(error, "Unexpected Interval unit value: %d", (int)interval_unit); return EINVAL; } } // We can't quite use nanoarrow's built-in SchemaSet functions for nested types // because the IPC format allows modifying some of the defaults those functions assume. // In particular, the allocate + initialize children step is handled outside these // setters. static int ArrowIpcDecoderSetTypeSimpleNested(struct ArrowSchema* schema, const char* format, struct ArrowError* error) { int result = ArrowSchemaSetFormat(schema, format); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetFormat('%s') failed", format); return result; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeFixedSizeList(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(FixedSizeList_table_t) type = (ns(FixedSizeList_table_t))type_generic; int32_t fixed_size = ns(FixedSizeList_listSize(type)); char fixed_size_str[128]; int n_chars = snprintf(fixed_size_str, 128, "+w:%d", fixed_size); fixed_size_str[n_chars] = '\0'; return ArrowIpcDecoderSetTypeSimpleNested(schema, fixed_size_str, error); } static int ArrowIpcDecoderSetTypeMap(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, struct ArrowError* error) { ns(Map_table_t) type = (ns(Map_table_t))type_generic; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetTypeSimpleNested(schema, "+m", error)); if (ns(Map_keysSorted(type))) { schema->flags |= ARROW_FLAG_MAP_KEYS_SORTED; } else { schema->flags &= ~ARROW_FLAG_MAP_KEYS_SORTED; } return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema, flatbuffers_generic_t type_generic, int64_t n_children, struct ArrowError* error) { ns(Union_table_t) type = (ns(Union_table_t))type_generic; int union_mode = ns(Union_mode(type)); if (n_children < 0 || n_children > 127) { ArrowErrorSet(error, "Expected between 0 and 127 children for Union type but found %ld", (long)n_children); return EINVAL; } // Max valid typeIds size is 127; the longest single ID that could be present here // is -INT_MIN (11 chars). With commas and the prefix the max size would be // 1527 characters. (Any ids outside the range 0...127 are unlikely to be valid // elsewhere but they could in theory be present here). char union_types_str[2048]; memset(union_types_str, 0, sizeof(union_types_str)); char* format_cursor = union_types_str; int format_out_size = sizeof(union_types_str); int n_chars = 0; const char* format_prefix; switch (union_mode) { case ns(UnionMode_Sparse): n_chars = snprintf(format_cursor, format_out_size, "+us:"); format_cursor += n_chars; format_out_size -= n_chars; break; case ns(UnionMode_Dense): n_chars = snprintf(format_cursor, format_out_size, "+ud:"); format_cursor += n_chars; format_out_size -= n_chars; break; default: ArrowErrorSet(error, "Unexpected Union UnionMode value: %d", (int)union_mode); return EINVAL; } if (ns(Union_typeIds_is_present(type))) { flatbuffers_int32_vec_t type_ids = ns(Union_typeIds(type)); int64_t n_type_ids = flatbuffers_int32_vec_len(type_ids); if (n_type_ids != n_children) { ArrowErrorSet( error, "Expected between %ld children for Union type with %ld typeIds but found %ld", (long)n_type_ids, (long)n_type_ids, (long)n_children); return EINVAL; } if (n_type_ids > 0) { n_chars = snprintf(format_cursor, format_out_size, "%d", flatbuffers_int32_vec_at(type_ids, 0)); format_cursor += n_chars; format_out_size -= n_chars; for (int64_t i = 1; i < n_type_ids; i++) { n_chars = snprintf(format_cursor, format_out_size, ",%d", (int)flatbuffers_int32_vec_at(type_ids, i)); format_cursor += n_chars; format_out_size -= n_chars; } } } else if (n_children > 0) { n_chars = snprintf(format_cursor, format_out_size, "0"); format_cursor += n_chars; format_out_size -= n_chars; for (int64_t i = 1; i < n_children; i++) { n_chars = snprintf(format_cursor, format_out_size, ",%d", (int)i); format_cursor += n_chars; format_out_size -= n_chars; } } return ArrowIpcDecoderSetTypeSimpleNested(schema, union_types_str, error); } static int ArrowIpcDecoderSetType(struct ArrowSchema* schema, ns(Field_table_t) field, int64_t n_children, struct ArrowError* error) { int type_type = ns(Field_type_type(field)); switch (type_type) { case ns(Type_Null): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_NA, error); case ns(Type_Bool): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BOOL, error); case ns(Type_Int): return ArrowIpcDecoderSetTypeInt(schema, ns(Field_type_get(field)), error); case ns(Type_FloatingPoint): return ArrowIpcDecoderSetTypeFloatingPoint(schema, ns(Field_type_get(field)), error); case ns(Type_Decimal): return ArrowIpcDecoderSetTypeDecimal(schema, ns(Field_type_get(field)), error); case ns(Type_Binary): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_BINARY, error); case ns(Type_LargeBinary): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_BINARY, error); case ns(Type_FixedSizeBinary): return ArrowIpcDecoderSetTypeFixedSizeBinary(schema, ns(Field_type_get(field)), error); case ns(Type_Utf8): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_STRING, error); case ns(Type_LargeUtf8): return ArrowIpcDecoderSetTypeSimple(schema, NANOARROW_TYPE_LARGE_STRING, error); case ns(Type_Date): return ArrowIpcDecoderSetTypeDate(schema, ns(Field_type_get(field)), error); case ns(Type_Time): return ArrowIpcDecoderSetTypeTime(schema, ns(Field_type_get(field)), error); case ns(Type_Timestamp): return ArrowIpcDecoderSetTypeTimestamp(schema, ns(Field_type_get(field)), error); case ns(Type_Duration): return ArrowIpcDecoderSetTypeDuration(schema, ns(Field_type_get(field)), error); case ns(Type_Interval): return ArrowIpcDecoderSetTypeInterval(schema, ns(Field_type_get(field)), error); case ns(Type_Struct_): return ArrowIpcDecoderSetTypeSimpleNested(schema, "+s", error); case ns(Type_List): return ArrowIpcDecoderSetTypeSimpleNested(schema, "+l", error); case ns(Type_LargeList): return ArrowIpcDecoderSetTypeSimpleNested(schema, "+L", error); case ns(Type_FixedSizeList): return ArrowIpcDecoderSetTypeFixedSizeList(schema, ns(Field_type_get(field)), error); case ns(Type_Map): return ArrowIpcDecoderSetTypeMap(schema, ns(Field_type_get(field)), error); case ns(Type_Union): return ArrowIpcDecoderSetTypeUnion(schema, ns(Field_type_get(field)), n_children, error); default: ArrowErrorSet(error, "Unrecognized Field type with value %d", (int)type_type); return EINVAL; } } static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t) fields, struct ArrowError* error); static int ArrowIpcDecoderSetField(struct ArrowSchema* schema, ns(Field_table_t) field, struct ArrowError* error) { // No dictionary support yet if (ns(Field_dictionary_is_present(field))) { ArrowErrorSet(error, "Schema message field with DictionaryEncoding not supported"); return ENOTSUP; } int result; if (ns(Field_name_is_present(field))) { result = ArrowSchemaSetName(schema, ns(Field_name_get(field))); } else { result = ArrowSchemaSetName(schema, ""); } if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaSetName() failed"); return result; } // Sets the schema->format and validates type-related inconsistencies // that might exist in the flatbuffer ns(Field_vec_t) children = ns(Field_children(field)); int64_t n_children = ns(Field_vec_len(children)); NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetType(schema, field, n_children, error)); // nanoarrow's type setters set the nullable flag by default, so we might // have to unset it here. if (ns(Field_nullable_get(field))) { schema->flags |= ARROW_FLAG_NULLABLE; } else { schema->flags &= ~ARROW_FLAG_NULLABLE; } // Children are defined separately in the flatbuffer, so we allocate, initialize // and set them separately as well. result = ArrowSchemaAllocateChildren(schema, n_children); if (result != NANOARROW_OK) { ArrowErrorSet(error, "ArrowSchemaAllocateChildren() failed"); return result; } for (int64_t i = 0; i < n_children; i++) { ArrowSchemaInit(schema->children[i]); } NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetChildren(schema, children, error)); return ArrowIpcDecoderSetMetadata(schema, ns(Field_custom_metadata(field)), error); } static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t) fields, struct ArrowError* error) { int64_t n_fields = ns(Schema_vec_len(fields)); for (int64_t i = 0; i < n_fields; i++) { ns(Field_table_t) field = ns(Field_vec_at(fields, i)); NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderSetField(schema->children[i], field, error)); } return NANOARROW_OK; } static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder, flatbuffers_generic_t message_header, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; ns(Schema_table_t) schema = (ns(Schema_table_t))message_header; int endianness = ns(Schema_endianness(schema)); switch (endianness) { case ns(Endianness_Little): decoder->endianness = NANOARROW_IPC_ENDIANNESS_LITTLE; break; case ns(Endianness_Big): decoder->endianness = NANOARROW_IPC_ENDIANNESS_BIG; break; default: ArrowErrorSet(error, "Expected Schema endianness of 0 (little) or 1 (big) but got %d", (int)endianness); return EINVAL; } ns(Feature_vec_t) features = ns(Schema_features(schema)); int64_t n_features = ns(Feature_vec_len(features)); decoder->feature_flags = 0; for (int64_t i = 0; i < n_features; i++) { ns(Feature_enum_t) feature = ns(Feature_vec_at(features, i)); switch (feature) { case ns(Feature_COMPRESSED_BODY): decoder->feature_flags |= NANOARROW_IPC_FEATURE_COMPRESSED_BODY; break; case ns(Feature_DICTIONARY_REPLACEMENT): decoder->feature_flags |= NANOARROW_IPC_FEATURE_DICTIONARY_REPLACEMENT; break; default: ArrowErrorSet(error, "Unrecognized Schema feature with value %d", (int)feature); return EINVAL; } } return NANOARROW_OK; } static int ArrowIpcDecoderDecodeRecordBatchHeader(struct ArrowIpcDecoder* decoder, flatbuffers_generic_t message_header, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))message_header; ns(FieldNode_vec_t) fields = ns(RecordBatch_nodes(batch)); ns(Buffer_vec_t) buffers = ns(RecordBatch_buffers(batch)); int64_t n_fields = ns(FieldNode_vec_len(fields)); int64_t n_buffers = ns(Buffer_vec_len(buffers)); // Check field node and buffer count. We have one more field and buffer // because we count the root struct and the flatbuffer message does not. if ((n_fields + 1) != private_data->n_fields) { ArrowErrorSet(error, "Expected %ld field nodes in message but found %ld", (long)private_data->n_fields - 1, (long)n_fields); return EINVAL; } if ((n_buffers + 1) != private_data->n_buffers) { ArrowErrorSet(error, "Expected %ld buffers in message but found %ld", (long)private_data->n_buffers - 1, (long)n_buffers); return EINVAL; } if (ns(RecordBatch_compression_is_present(batch))) { ns(BodyCompression_table_t) compression = ns(RecordBatch_compression(batch)); ns(CompressionType_enum_t) codec = ns(BodyCompression_codec(compression)); switch (codec) { case ns(CompressionType_LZ4_FRAME): decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_LZ4_FRAME; break; case ns(CompressionType_ZSTD): decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_ZSTD; break; default: ArrowErrorSet(error, "Unrecognized RecordBatch BodyCompression codec value: %d", (int)codec); return EINVAL; } } else { decoder->codec = NANOARROW_IPC_COMPRESSION_TYPE_NONE; } // Copying field node and buffer information is separate so as only to pay for the // nodes that are actually accessed. return NANOARROW_OK; } // Wipes any "current message" fields before moving on to a new message static inline void ArrowIpcDecoderResetHeaderInfo(struct ArrowIpcDecoder* decoder) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; decoder->message_type = 0; decoder->metadata_version = 0; decoder->endianness = 0; decoder->feature_flags = 0; decoder->codec = 0; decoder->header_size_bytes = 0; decoder->body_size_bytes = 0; private_data->last_message = NULL; } // Returns NANOARROW_OK if data is large enough to read the message header, // ESPIPE if reading more data might help, or EINVAL if the content is not valid static inline int ArrowIpcDecoderCheckHeader(struct ArrowIpcDecoder* decoder, struct ArrowBufferView* data_mut, int32_t* message_size_bytes, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (data_mut->size_bytes < 8) { ArrowErrorSet(error, "Expected data of at least 8 bytes but only %ld bytes remain", (long)data_mut->size_bytes); return ESPIPE; } uint32_t continuation = ArrowIpcReadContinuationBytes(data_mut); if (continuation != 0xFFFFFFFF) { ArrowErrorSet(error, "Expected 0xFFFFFFFF at start of message but found 0x%08X", (unsigned int)continuation); return EINVAL; } int swap_endian = private_data->system_endianness == NANOARROW_IPC_ENDIANNESS_BIG; int32_t header_body_size_bytes = ArrowIpcReadInt32LE(data_mut, swap_endian); *message_size_bytes = header_body_size_bytes + (2 * sizeof(int32_t)); if (header_body_size_bytes < 0) { ArrowErrorSet( error, "Expected message body size > 0 but found message body size of %ld bytes", (long)header_body_size_bytes); return EINVAL; } else if (header_body_size_bytes > data_mut->size_bytes) { ArrowErrorSet(error, "Expected 0 <= message body size <= %ld bytes but found message " "body size of %ld bytes", (long)data_mut->size_bytes, (long)header_body_size_bytes); return ESPIPE; } if (header_body_size_bytes == 0) { ArrowErrorSet(error, "End of Arrow stream"); return ENODATA; } return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder, struct ArrowBufferView data, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderVerifyHeader(struct ArrowIpcDecoder* decoder, struct ArrowBufferView data, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); // Run flatbuffers verification if (ns(Message_verify_as_root(data.data.as_uint8, decoder->header_size_bytes - (2 * sizeof(int32_t)))) != flatcc_verify_ok) { ArrowErrorSet(error, "Message flatbuffer verification failed"); return EINVAL; } // Read some basic information from the message ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8)); decoder->metadata_version = ns(Message_version(message)); decoder->message_type = ns(Message_header_type(message)); decoder->body_size_bytes = ns(Message_bodyLength(message)); private_data->last_message = ns(Message_header_get(message)); return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, struct ArrowBufferView data, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderCheckHeader(decoder, &data, &decoder->header_size_bytes, error)); ns(Message_table_t) message = ns(Message_as_root(data.data.as_uint8)); if (!message) { return EINVAL; } // Read some basic information from the message int32_t metadata_version = ns(Message_version(message)); decoder->message_type = ns(Message_header_type(message)); decoder->body_size_bytes = ns(Message_bodyLength(message)); switch (decoder->metadata_version) { case ns(MetadataVersion_V5): break; case ns(MetadataVersion_V1): case ns(MetadataVersion_V2): case ns(MetadataVersion_V3): case ns(MetadataVersion_V4): ArrowErrorSet(error, "Expected metadata version V5 but found %s", ns(MetadataVersion_name(decoder->metadata_version))); break; default: ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)", decoder->metadata_version); return EINVAL; } flatbuffers_generic_t message_header = ns(Message_header_get(message)); switch (decoder->message_type) { case ns(MessageHeader_Schema): NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderDecodeSchemaHeader(decoder, message_header, error)); break; case ns(MessageHeader_RecordBatch): NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderDecodeRecordBatchHeader(decoder, message_header, error)); break; case ns(MessageHeader_DictionaryBatch): case ns(MessageHeader_Tensor): case ns(MessageHeader_SparseTensor): ArrowErrorSet(error, "Unsupported message type: '%s'", ns(MessageHeader_type_name(decoder->message_type))); return ENOTSUP; default: ArrowErrorSet(error, "Unnown message type: %d", (int)(decoder->message_type)); return EINVAL; } private_data->last_message = message_header; return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderDecodeSchema(struct ArrowIpcDecoder* decoder, struct ArrowSchema* out, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (private_data->last_message == NULL || decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_SCHEMA) { ArrowErrorSet(error, "decoder did not just decode a Schema message"); return EINVAL; } ns(Schema_table_t) schema = (ns(Schema_table_t))private_data->last_message; ns(Field_vec_t) fields = ns(Schema_fields(schema)); int64_t n_fields = ns(Schema_vec_len(fields)); struct ArrowSchema tmp; ArrowSchemaInit(&tmp); int result = ArrowSchemaSetTypeStruct(&tmp, n_fields); if (result != NANOARROW_OK) { tmp.release(&tmp); ArrowErrorSet(error, "Failed to allocate struct schema with %ld children", (long)n_fields); return result; } result = ArrowIpcDecoderSetChildren(&tmp, fields, error); if (result != NANOARROW_OK) { tmp.release(&tmp); return result; } result = ArrowIpcDecoderSetMetadata(&tmp, ns(Schema_custom_metadata(schema)), error); if (result != NANOARROW_OK) { tmp.release(&tmp); return result; } ArrowSchemaMove(&tmp, out); return NANOARROW_OK; } static void ArrowIpcDecoderCountFields(struct ArrowSchema* schema, int64_t* n_fields) { *n_fields += 1; for (int64_t i = 0; i < schema->n_children; i++) { ArrowIpcDecoderCountFields(schema->children[i], n_fields); } } static void ArrowIpcDecoderInitFields(struct ArrowIpcField* fields, struct ArrowArrayView* array_view, struct ArrowArray* array, int64_t* n_fields, int64_t* n_buffers) { struct ArrowIpcField* field = fields + (*n_fields); field->array_view = array_view; field->array = array; field->buffer_offset = *n_buffers; for (int i = 0; i < 3; i++) { *n_buffers += array_view->layout.buffer_type[i] != NANOARROW_BUFFER_TYPE_NONE; } *n_fields += 1; for (int64_t i = 0; i < array_view->n_children; i++) { ArrowIpcDecoderInitFields(fields, array_view->children[i], array->children[i], n_fields, n_buffers); } } ArrowErrorCode ArrowIpcDecoderSetSchema(struct ArrowIpcDecoder* decoder, struct ArrowSchema* schema, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; // Reset previously allocated schema-specific resources private_data->n_buffers = 0; private_data->n_fields = 0; ArrowArrayViewReset(&private_data->array_view); if (private_data->array.release != NULL) { private_data->array.release(&private_data->array); } if (private_data->fields != NULL) { ArrowFree(private_data->fields); } // Allocate Array and ArrayView based on schema without moving the schema. // This will fail if the schema is not valid. NANOARROW_RETURN_NOT_OK( ArrowArrayViewInitFromSchema(&private_data->array_view, schema, error)); NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromArrayView(&private_data->array, &private_data->array_view, error)); // Root must be a struct if (private_data->array_view.storage_type != NANOARROW_TYPE_STRUCT) { ArrowErrorSet(error, "schema must be a struct type"); return EINVAL; } // Walk tree and calculate how many fields we need to allocate ArrowIpcDecoderCountFields(schema, &private_data->n_fields); private_data->fields = (struct ArrowIpcField*)ArrowMalloc(private_data->n_fields * sizeof(struct ArrowIpcField)); if (private_data->fields == NULL) { ArrowErrorSet(error, "Failed to allocate decoder->fields"); return ENOMEM; } memset(private_data->fields, 0, private_data->n_fields * sizeof(struct ArrowIpcField)); // Init field information and calculate starting buffer offset for each int64_t field_i = 0; ArrowIpcDecoderInitFields(private_data->fields, &private_data->array_view, &private_data->array, &field_i, &private_data->n_buffers); return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderSetEndianness(struct ArrowIpcDecoder* decoder, enum ArrowIpcEndianness endianness) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; switch (endianness) { case NANOARROW_IPC_ENDIANNESS_UNINITIALIZED: case NANOARROW_IPC_ENDIANNESS_LITTLE: case NANOARROW_IPC_ENDIANNESS_BIG: private_data->endianness = endianness; return NANOARROW_OK; default: return EINVAL; } } /// \brief Information required to read and/or decompress a single buffer /// /// The RecordBatch message header contains a description of each buffer /// in the message body. The ArrowIpcBufferSource is the parsed result of /// a single buffer with compression and endian information such that the /// original buffer can be reconstructed. struct ArrowIpcBufferSource { int64_t body_offset_bytes; int64_t buffer_length_bytes; enum ArrowIpcCompressionType codec; enum ArrowType data_type; int32_t element_size_bits; int swap_endian; }; /// \brief Materializing ArrowBuffer objects /// /// Given a description of where a buffer is located inside the message body, make /// the ArrowBuffer that will be placed into the correct ArrowArray. The decoder /// does not do any IO and does not make any assumptions about how or if the body /// has been read into memory. This abstraction is currently internal and exists /// to support the two obvious ways a user might go about this: (1) using a /// non-owned view of memory that must be copied slice-wise or (2) adding a reference /// to an ArrowIpcSharedBuffer and returning a slice of that memory. struct ArrowIpcBufferFactory { /// \brief User-defined callback to populate a buffer view /// /// At the time that this callback is called, the ArrowIpcBufferSource has been checked /// to ensure that it is within the body size declared by the message header. A /// possibly preallocated ArrowBuffer (dst) is provided, which implementations must use /// if an allocation is required (in which case the view must be populated pointing to /// the contents of the ArrowBuffer) If NANOARROW_OK is not returned, error must contain /// a null-terminated message. ArrowErrorCode (*make_buffer)(struct ArrowIpcBufferFactory* factory, struct ArrowIpcBufferSource* src, struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error); /// \brief Caller-defined private data to be used in the callback. /// /// Usually this would be a description of where the body has been read into memory or /// information required to do so. void* private_data; }; static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* factory, struct ArrowIpcBufferSource* src, struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { struct ArrowBufferView* body = (struct ArrowBufferView*)factory->private_data; dst_view->data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes; dst_view->size_bytes = src->buffer_length_bytes; return NANOARROW_OK; } static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromView( struct ArrowBufferView* buffer_view) { struct ArrowIpcBufferFactory out; out.make_buffer = &ArrowIpcMakeBufferFromView; out.private_data = buffer_view; return out; } static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* factory, struct ArrowIpcBufferSource* src, struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { struct ArrowIpcSharedBuffer* shared = (struct ArrowIpcSharedBuffer*)factory->private_data; ArrowBufferReset(dst); ArrowIpcSharedBufferClone(shared, dst); dst->data += src->body_offset_bytes; dst->size_bytes = src->buffer_length_bytes; dst_view->data.data = dst->data; dst_view->size_bytes = dst->size_bytes; return NANOARROW_OK; } static struct ArrowIpcBufferFactory ArrowIpcBufferFactoryFromShared( struct ArrowIpcSharedBuffer* shared) { struct ArrowIpcBufferFactory out; out.make_buffer = &ArrowIpcMakeBufferFromShared; out.private_data = shared; return out; } // Just for the purposes of endian-swapping struct ArrowIpcIntervalMonthDayNano { uint32_t months; uint32_t days; uint64_t ns; }; static int ArrowIpcDecoderSwapEndian(struct ArrowIpcBufferSource* src, struct ArrowBufferView* out_view, struct ArrowBuffer* dst, struct ArrowError* error) { // Some buffer data types don't need any endian swapping switch (src->data_type) { case NANOARROW_TYPE_BOOL: case NANOARROW_TYPE_INT8: case NANOARROW_TYPE_UINT8: case NANOARROW_TYPE_STRING: case NANOARROW_TYPE_BINARY: return NANOARROW_OK; default: break; } // Make sure dst is not a shared buffer that we can't modify struct ArrowBuffer tmp; ArrowBufferInit(&tmp); if (dst->allocator.private_data != NULL) { ArrowBufferMove(dst, &tmp); ArrowBufferInit(dst); } if (dst->size_bytes == 0) { NANOARROW_RETURN_NOT_OK(ArrowBufferReserve(dst, out_view->size_bytes)); dst->size_bytes = out_view->size_bytes; } switch (src->data_type) { case NANOARROW_TYPE_DECIMAL128: case NANOARROW_TYPE_DECIMAL256: { const uint64_t* ptr_src = out_view->data.as_uint64; uint64_t* ptr_dst = (uint64_t*)dst->data; uint64_t words[4]; int n_words = src->element_size_bits / 64; for (int64_t i = 0; i < (dst->size_bytes / n_words / 8); i++) { for (int j = 0; j < n_words; j++) { words[j] = bswap64(ptr_src[i * n_words + j]); } for (int j = 0; j < n_words; j++) { ptr_dst[i * n_words + j] = words[n_words - j - 1]; } } break; } case NANOARROW_TYPE_INTERVAL_DAY_TIME: { uint32_t* ptr = (uint32_t*)dst->data; for (int64_t i = 0; i < (dst->size_bytes / 4); i++) { ptr[i] = bswap32(out_view->data.as_uint32[i]); } break; } case NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO: { const uint8_t* ptr_src = out_view->data.as_uint8; uint8_t* ptr_dst = dst->data; int item_size_bytes = 16; struct ArrowIpcIntervalMonthDayNano item; for (int64_t i = 0; i < (dst->size_bytes / item_size_bytes); i++) { memcpy(&item, ptr_src + i * item_size_bytes, item_size_bytes); item.months = bswap32(item.months); item.days = bswap32(item.days); item.ns = bswap64(item.ns); memcpy(ptr_dst + i * item_size_bytes, &item, item_size_bytes); } break; } default: switch (src->element_size_bits) { case 16: { uint16_t* ptr = (uint16_t*)dst->data; for (int64_t i = 0; i < (dst->size_bytes / 2); i++) { ptr[i] = bswap16(out_view->data.as_uint16[i]); } break; } case 32: { uint32_t* ptr = (uint32_t*)dst->data; for (int64_t i = 0; i < (dst->size_bytes / 4); i++) { ptr[i] = bswap32(out_view->data.as_uint32[i]); } break; } case 64: { uint64_t* ptr = (uint64_t*)dst->data; for (int64_t i = 0; i < (dst->size_bytes / 8); i++) { ptr[i] = bswap64(out_view->data.as_uint64[i]); } break; } default: ArrowErrorSet(error, "Endian swapping for element bitwidth %d is not supported", (int)src->element_size_bits); return ENOTSUP; } break; } ArrowBufferReset(&tmp); out_view->data.data = dst->data; return NANOARROW_OK; } struct ArrowIpcArraySetter { ns(FieldNode_vec_t) fields; int64_t field_i; ns(Buffer_vec_t) buffers; int64_t buffer_i; int64_t body_size_bytes; struct ArrowIpcBufferSource src; struct ArrowIpcBufferFactory factory; }; static int ArrowIpcDecoderMakeBuffer(struct ArrowIpcArraySetter* setter, int64_t offset, int64_t length, struct ArrowBufferView* out_view, struct ArrowBuffer* out, struct ArrowError* error) { out_view->data.data = NULL; out_view->size_bytes = 0; if (length == 0) { return NANOARROW_OK; } // Check that this buffer fits within the body int64_t buffer_start = offset; int64_t buffer_end = buffer_start + length; if (buffer_start < 0 || buffer_end > setter->body_size_bytes) { ArrowErrorSet(error, "Buffer requires body offsets [%ld..%ld) but body has size %ld", (long)buffer_start, (long)buffer_end, (long)setter->body_size_bytes); return EINVAL; } // If the ArrowIpcBufferFactory is made public, these should get moved (since then a // user could inject support for either one). More likely, by the time that happens, // this library will be able to support some of these features. if (setter->src.codec != NANOARROW_IPC_COMPRESSION_TYPE_NONE) { ArrowErrorSet(error, "The nanoarrow_ipc extension does not support compression"); return ENOTSUP; } setter->src.body_offset_bytes = offset; setter->src.buffer_length_bytes = length; NANOARROW_RETURN_NOT_OK( setter->factory.make_buffer(&setter->factory, &setter->src, out_view, out, error)); if (setter->src.swap_endian) { NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderSwapEndian(&setter->src, out_view, out, error)); } return NANOARROW_OK; } static int ArrowIpcDecoderWalkGetArray(struct ArrowArrayView* array_view, struct ArrowArray* array, struct ArrowArray* out, struct ArrowError* error) { out->length = array_view->length; out->null_count = array_view->null_count; for (int64_t i = 0; i < array->n_buffers; i++) { struct ArrowBufferView view = array_view->buffer_views[i]; struct ArrowBuffer* scratch_buffer = ArrowArrayBuffer(array, i); struct ArrowBuffer* buffer_out = ArrowArrayBuffer(out, i); // If the scratch buffer was used, move it to the final array. Otherwise, // copy the view. if (scratch_buffer->size_bytes == 0) { NANOARROW_RETURN_NOT_OK(ArrowBufferAppendBufferView(buffer_out, view)); } else if (scratch_buffer->data == view.data.as_uint8) { ArrowBufferMove(scratch_buffer, buffer_out); } else { ArrowErrorSet( error, "Internal: scratch buffer was used but doesn't point to the same data as view"); return EINVAL; } } for (int64_t i = 0; i < array->n_children; i++) { NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkGetArray( array_view->children[i], array->children[i], out->children[i], error)); } return NANOARROW_OK; } static int ArrowIpcDecoderWalkSetArrayView(struct ArrowIpcArraySetter* setter, struct ArrowArrayView* array_view, struct ArrowArray* array, struct ArrowError* error) { ns(FieldNode_struct_t) field = ns(FieldNode_vec_at(setter->fields, (size_t)setter->field_i)); array_view->length = ns(FieldNode_length(field)); array_view->null_count = ns(FieldNode_null_count(field)); setter->field_i += 1; for (int64_t i = 0; i < 3; i++) { if (array_view->layout.buffer_type[i] == NANOARROW_BUFFER_TYPE_NONE) { break; } ns(Buffer_struct_t) buffer = ns(Buffer_vec_at(setter->buffers, (size_t)setter->buffer_i)); int64_t buffer_offset = ns(Buffer_offset(buffer)); int64_t buffer_length = ns(Buffer_length(buffer)); setter->buffer_i += 1; // Provide a buffer that will be used if any allocation has to occur struct ArrowBuffer* buffer_dst = ArrowArrayBuffer(array, i); // Attempt to re-use any previous allocation unless this buffer is // wrapping a custom allocator. if (buffer_dst->allocator.private_data != NULL) { ArrowBufferReset(buffer_dst); } else { buffer_dst->size_bytes = 0; } setter->src.data_type = array_view->layout.buffer_data_type[i]; setter->src.element_size_bits = array_view->layout.element_size_bits[i]; NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderMakeBuffer(setter, buffer_offset, buffer_length, &array_view->buffer_views[i], buffer_dst, error)); } for (int64_t i = 0; i < array_view->n_children; i++) { NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkSetArrayView( setter, array_view->children[i], array->children[i], error)); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcDecoderDecodeArrayInternal( struct ArrowIpcDecoder* decoder, int64_t field_i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; struct ArrowIpcField* root = private_data->fields + field_i + 1; if (field_i == -1) { NANOARROW_RETURN_NOT_OK( ArrowArrayInitFromArrayView(out, &private_data->array_view, error)); out->length = private_data->array_view.length; out->null_count = private_data->array_view.null_count; for (int64_t i = 0; i < private_data->array_view.n_children; i++) { NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkGetArray( private_data->array_view.children[i], private_data->array.children[i], out->children[i], error)); } } else { NANOARROW_RETURN_NOT_OK(ArrowArrayInitFromArrayView(out, root->array_view, error)); NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderWalkGetArray(root->array_view, root->array, out, error)); } // If validation is going to happen it has already occurred; however, the part of // ArrowArrayFinishBuilding() that allocates a data buffer if the data buffer is // NULL (required for compatability with Arrow <= 9.0.0) assumes CPU data access // and thus needs a validation level >= default. if (validation_level >= NANOARROW_VALIDATION_LEVEL_DEFAULT) { NANOARROW_RETURN_NOT_OK( ArrowArrayFinishBuilding(out, NANOARROW_VALIDATION_LEVEL_DEFAULT, error)); } else { NANOARROW_RETURN_NOT_OK( ArrowArrayFinishBuilding(out, NANOARROW_VALIDATION_LEVEL_NONE, error)); } return NANOARROW_OK; } static ArrowErrorCode ArrowIpcDecoderDecodeArrayViewInternal( struct ArrowIpcDecoder* decoder, struct ArrowIpcBufferFactory factory, int64_t field_i, struct ArrowArrayView** out_view, struct ArrowError* error) { struct ArrowIpcDecoderPrivate* private_data = (struct ArrowIpcDecoderPrivate*)decoder->private_data; if (private_data->last_message == NULL || decoder->message_type != NANOARROW_IPC_MESSAGE_TYPE_RECORD_BATCH) { ArrowErrorSet(error, "decoder did not just decode a RecordBatch message"); return EINVAL; } ns(RecordBatch_table_t) batch = (ns(RecordBatch_table_t))private_data->last_message; // RecordBatch messages don't count the root node but decoder->fields does struct ArrowIpcField* root = private_data->fields + field_i + 1; struct ArrowIpcArraySetter setter; setter.fields = ns(RecordBatch_nodes(batch)); setter.field_i = field_i; setter.buffers = ns(RecordBatch_buffers(batch)); setter.buffer_i = root->buffer_offset - 1; setter.body_size_bytes = decoder->body_size_bytes; setter.factory = factory; setter.src.codec = decoder->codec; setter.src.swap_endian = ArrowIpcDecoderNeedsSwapEndian(decoder); // The flatbuffers FieldNode doesn't count the root struct so we have to loop over the // children ourselves if (field_i == -1) { root->array_view->length = ns(RecordBatch_length(batch)); root->array_view->null_count = 0; setter.field_i++; setter.buffer_i++; for (int64_t i = 0; i < root->array_view->n_children; i++) { NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderWalkSetArrayView( &setter, root->array_view->children[i], root->array->children[i], error)); } } else { NANOARROW_RETURN_NOT_OK( ArrowIpcDecoderWalkSetArrayView(&setter, root->array_view, root->array, error)); } *out_view = root->array_view; return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderDecodeArrayView(struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, struct ArrowArrayView** out, struct ArrowError* error) { return ArrowIpcDecoderDecodeArrayViewInternal( decoder, ArrowIpcBufferFactoryFromView(&body), i, out, error); } ArrowErrorCode ArrowIpcDecoderDecodeArray(struct ArrowIpcDecoder* decoder, struct ArrowBufferView body, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowArrayView* array_view; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal( decoder, ArrowIpcBufferFactoryFromView(&body), i, &array_view, error)); NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level, error)); struct ArrowArray temp; temp.release = NULL; int result = ArrowIpcDecoderDecodeArrayInternal(decoder, i, &temp, validation_level, error); if (result != NANOARROW_OK && temp.release != NULL) { temp.release(&temp); } else if (result != NANOARROW_OK) { return result; } ArrowArrayMove(&temp, out); return NANOARROW_OK; } ArrowErrorCode ArrowIpcDecoderDecodeArrayFromShared( struct ArrowIpcDecoder* decoder, struct ArrowIpcSharedBuffer* body, int64_t i, struct ArrowArray* out, enum ArrowValidationLevel validation_level, struct ArrowError* error) { struct ArrowArrayView* array_view; NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderDecodeArrayViewInternal( decoder, ArrowIpcBufferFactoryFromShared(body), i, &array_view, error)); NANOARROW_RETURN_NOT_OK(ArrowArrayViewValidate(array_view, validation_level, error)); struct ArrowArray temp; temp.release = NULL; int result = ArrowIpcDecoderDecodeArrayInternal(decoder, i, &temp, validation_level, error); if (result != NANOARROW_OK && temp.release != NULL) { temp.release(&temp); } else if (result != NANOARROW_OK) { return result; } ArrowArrayMove(&temp, out); return NANOARROW_OK; }