arrow/cdata/cdata.go (817 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build cgo // +build cgo package cdata // implement handling of the Arrow C Data Interface. At least from a consuming side. // #include "abi.h" // #include "helpers.h" // #include <stdlib.h> // int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); } // int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); } // const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); } // struct ArrowArray* get_arr() { // struct ArrowArray* out = (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); // memset(out, 0, sizeof(struct ArrowArray)); // return out; // } // struct ArrowArrayStream* get_stream() { // struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); // memset(out, 0, sizeof(struct ArrowArrayStream)); // return out; // } // import "C" import ( "errors" "fmt" "io" "runtime" "strconv" "strings" "syscall" "unsafe" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/bitutil" "github.com/apache/arrow-go/v18/arrow/memory" "golang.org/x/xerrors" ) type ( // CArrowSchema is the C Data Interface for ArrowSchemas defined in abi.h CArrowSchema = C.struct_ArrowSchema // CArrowArray is the C Data Interface object for Arrow Arrays as defined in abi.h CArrowArray = C.struct_ArrowArray // CArrowArrayStream is the C Stream Interface object for handling streams of record batches. CArrowArrayStream = C.struct_ArrowArrayStream CArrowAsyncDeviceStreamHandler = C.struct_ArrowAsyncDeviceStreamHandler CArrowAsyncProducer = C.struct_ArrowAsyncProducer CArrowAsyncTask = C.struct_ArrowAsyncTask CArrowDeviceArray = C.struct_ArrowDeviceArray ) // Map from the defined strings to their corresponding arrow.DataType interface // object instances, for types that don't require params. var formatToSimpleType = map[string]arrow.DataType{ "n": arrow.Null, "b": arrow.FixedWidthTypes.Boolean, "c": arrow.PrimitiveTypes.Int8, "C": arrow.PrimitiveTypes.Uint8, "s": arrow.PrimitiveTypes.Int16, "S": arrow.PrimitiveTypes.Uint16, "i": arrow.PrimitiveTypes.Int32, "I": arrow.PrimitiveTypes.Uint32, "l": arrow.PrimitiveTypes.Int64, "L": arrow.PrimitiveTypes.Uint64, "e": arrow.FixedWidthTypes.Float16, "f": arrow.PrimitiveTypes.Float32, "g": arrow.PrimitiveTypes.Float64, "z": arrow.BinaryTypes.Binary, "Z": arrow.BinaryTypes.LargeBinary, "u": arrow.BinaryTypes.String, "U": arrow.BinaryTypes.LargeString, "vz": arrow.BinaryTypes.BinaryView, "vu": arrow.BinaryTypes.StringView, "tdD": arrow.FixedWidthTypes.Date32, "tdm": arrow.FixedWidthTypes.Date64, "tts": arrow.FixedWidthTypes.Time32s, "ttm": arrow.FixedWidthTypes.Time32ms, "ttu": arrow.FixedWidthTypes.Time64us, "ttn": arrow.FixedWidthTypes.Time64ns, "tDs": arrow.FixedWidthTypes.Duration_s, "tDm": arrow.FixedWidthTypes.Duration_ms, "tDu": arrow.FixedWidthTypes.Duration_us, "tDn": arrow.FixedWidthTypes.Duration_ns, "tiM": arrow.FixedWidthTypes.MonthInterval, "tiD": arrow.FixedWidthTypes.DayTimeInterval, "tin": arrow.FixedWidthTypes.MonthDayNanoInterval, } // decode metadata from C which is encoded as // // [int32] -> number of metadata pairs // for 0..n // [int32] -> number of bytes in key // [n bytes] -> key value // [int32] -> number of bytes in value // [n bytes] -> value func decodeCMetadata(md *C.char) arrow.Metadata { if md == nil { return arrow.Metadata{} } // don't copy the bytes, just reference them directly const maxlen = 0x7fffffff data := (*[maxlen]byte)(unsafe.Pointer(md))[:] readint32 := func() int32 { v := *(*int32)(unsafe.Pointer(&data[0])) data = data[arrow.Int32SizeBytes:] return v } readstr := func() string { l := readint32() s := string(data[:l]) data = data[l:] return s } npairs := readint32() if npairs == 0 { return arrow.Metadata{} } keys := make([]string, npairs) vals := make([]string, npairs) for i := int32(0); i < npairs; i++ { keys[i] = readstr() vals[i] = readstr() } return arrow.NewMetadata(keys, vals) } // convert a C.ArrowSchema to an arrow.Field to maintain metadata with the schema func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { // always release, even on error defer C.ArrowSchemaRelease(schema) var childFields []arrow.Field if schema.n_children > 0 { // call ourselves recursively if there are children. // set up a slice to reference safely schemaChildren := unsafe.Slice(schema.children, schema.n_children) childFields = make([]arrow.Field, schema.n_children) for i, c := range schemaChildren { childFields[i], err = importSchema((*CArrowSchema)(c)) if err != nil { return } } } // copy the schema name from the c-string ret.Name = C.GoString(schema.name) ret.Nullable = (schema.flags & C.ARROW_FLAG_NULLABLE) != 0 ret.Metadata = decodeCMetadata(schema.metadata) // copies the c-string here, but it's very small f := C.GoString(schema.format) // handle our non-parameterized simple types. dt, ok := formatToSimpleType[f] if ok { ret.Type = dt if schema.dictionary != nil { valueField, err := importSchema(schema.dictionary) if err != nil { return ret, err } ret.Type = &arrow.DictionaryType{ IndexType: ret.Type, ValueType: valueField.Type, Ordered: schema.dictionary.flags&C.ARROW_FLAG_DICTIONARY_ORDERED != 0, } } return } // handle types with params via colon typs := strings.Split(f, ":") defaulttz := "" switch typs[0] { case "tss": tz := typs[1] if len(typs[1]) == 0 { tz = defaulttz } dt = &arrow.TimestampType{Unit: arrow.Second, TimeZone: tz} case "tsm": tz := typs[1] if len(typs[1]) == 0 { tz = defaulttz } dt = &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: tz} case "tsu": tz := typs[1] if len(typs[1]) == 0 { tz = defaulttz } dt = &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: tz} case "tsn": tz := typs[1] if len(typs[1]) == 0 { tz = defaulttz } dt = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: tz} case "w": // fixed size binary is "w:##" where ## is the byteWidth byteWidth, err := strconv.Atoi(typs[1]) if err != nil { return ret, err } dt = &arrow.FixedSizeBinaryType{ByteWidth: byteWidth} case "d": // decimal types are d:<precision>,<scale>[,<bitsize>] size is assumed 128 if left out props := typs[1] propList := strings.Split(props, ",") bitwidth := 128 var precision, scale int if len(propList) < 2 || len(propList) > 3 { return ret, xerrors.Errorf("invalid decimal spec '%s': wrong number of properties", f) } else if len(propList) == 3 { bitwidth, err = strconv.Atoi(propList[2]) if err != nil { return ret, xerrors.Errorf("could not parse decimal bitwidth in '%s': %s", f, err.Error()) } } precision, err = strconv.Atoi(propList[0]) if err != nil { return ret, xerrors.Errorf("could not parse decimal precision in '%s': %s", f, err.Error()) } scale, err = strconv.Atoi(propList[1]) if err != nil { return ret, xerrors.Errorf("could not parse decimal scale in '%s': %s", f, err.Error()) } switch bitwidth { case 32: dt = &arrow.Decimal32Type{Precision: int32(precision), Scale: int32(scale)} case 64: dt = &arrow.Decimal64Type{Precision: int32(precision), Scale: int32(scale)} case 128: dt = &arrow.Decimal128Type{Precision: int32(precision), Scale: int32(scale)} case 256: dt = &arrow.Decimal256Type{Precision: int32(precision), Scale: int32(scale)} default: return ret, xerrors.Errorf("unsupported decimal bitwidth, got '%s'", f) } } if f[0] == '+' { // types with children switch f[1] { case 'l': // list dt = arrow.ListOfField(childFields[0]) case 'L': // large list dt = arrow.LargeListOfField(childFields[0]) case 'v': // list view/large list view if f[2] == 'l' { dt = arrow.ListViewOfField(childFields[0]) } else if f[2] == 'L' { dt = arrow.LargeListViewOfField(childFields[0]) } case 'w': // fixed size list is w:# where # is the list size. listSize, err := strconv.Atoi(strings.Split(f, ":")[1]) if err != nil { return ret, err } dt = arrow.FixedSizeListOfField(int32(listSize), childFields[0]) case 's': // struct dt = arrow.StructOf(childFields...) case 'r': // run-end encoded if len(childFields) != 2 { return ret, fmt.Errorf("%w: run-end encoded arrays must have 2 children", arrow.ErrInvalid) } dt = arrow.RunEndEncodedOf(childFields[0].Type, childFields[1].Type) case 'm': // map type is basically a list of structs. st := childFields[0].Type.(*arrow.StructType) dt = arrow.MapOf(st.Field(0).Type, st.Field(1).Type) dt.(*arrow.MapType).KeysSorted = (schema.flags & C.ARROW_FLAG_MAP_KEYS_SORTED) != 0 case 'u': // union var mode arrow.UnionMode switch f[2] { case 'd': mode = arrow.DenseMode case 's': mode = arrow.SparseMode default: err = fmt.Errorf("%w: invalid union type", arrow.ErrInvalid) return } codes := strings.Split(strings.Split(f, ":")[1], ",") typeCodes := make([]arrow.UnionTypeCode, 0, len(codes)) for _, i := range codes { v, e := strconv.ParseInt(i, 10, 8) if e != nil { err = fmt.Errorf("%w: invalid type code: %s", arrow.ErrInvalid, e) return } if v < 0 { err = fmt.Errorf("%w: negative type code in union: format string %s", arrow.ErrInvalid, f) return } typeCodes = append(typeCodes, arrow.UnionTypeCode(v)) } if len(childFields) != len(typeCodes) { err = fmt.Errorf("%w: ArrowArray struct number of children incompatible with format string", arrow.ErrInvalid) return } dt = arrow.UnionOf(mode, childFields, typeCodes) } } if dt == nil { // if we didn't find a type, then it's something we haven't implemented. err = xerrors.New("unimplemented type") } else { ret.Type = dt } return } // importer to keep track when importing C ArrowArray objects. type cimporter struct { dt arrow.DataType arr *CArrowArray data arrow.ArrayData parent *cimporter children []cimporter cbuffers []*C.void alloc *importAllocator } func (imp *cimporter) importChild(parent *cimporter, src *CArrowArray) error { imp.parent, imp.arr, imp.alloc = parent, src, parent.alloc return imp.doImport() } // import any child arrays for lists, structs, and so on. func (imp *cimporter) doImportChildren() error { children := unsafe.Slice(imp.arr.children, imp.arr.n_children) if len(children) > 0 { imp.children = make([]cimporter, len(children)) } // handle the cases switch imp.dt.ID() { case arrow.LIST: // only one child to import imp.children[0].dt = imp.dt.(*arrow.ListType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.LARGE_LIST: // only one child to import imp.children[0].dt = imp.dt.(*arrow.LargeListType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.LIST_VIEW: // only one child to import imp.children[0].dt = imp.dt.(*arrow.ListViewType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.LARGE_LIST_VIEW: // only one child to import imp.children[0].dt = imp.dt.(*arrow.LargeListViewType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.FIXED_SIZE_LIST: // only one child to import imp.children[0].dt = imp.dt.(*arrow.FixedSizeListType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.STRUCT: // import all the children st := imp.dt.(*arrow.StructType) for i, c := range children { imp.children[i].dt = st.Field(i).Type imp.children[i].importChild(imp, c) } case arrow.RUN_END_ENCODED: // import run-ends and values st := imp.dt.(*arrow.RunEndEncodedType) imp.children[0].dt = st.RunEnds() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } imp.children[1].dt = st.Encoded() if err := imp.children[1].importChild(imp, children[1]); err != nil { return err } case arrow.MAP: // only one child to import, it's a struct array imp.children[0].dt = imp.dt.(*arrow.MapType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } case arrow.DENSE_UNION: dt := imp.dt.(*arrow.DenseUnionType) for i, c := range children { imp.children[i].dt = dt.Fields()[i].Type imp.children[i].importChild(imp, c) } case arrow.SPARSE_UNION: dt := imp.dt.(*arrow.SparseUnionType) for i, c := range children { imp.children[i].dt = dt.Fields()[i].Type imp.children[i].importChild(imp, c) } } return nil } func (imp *cimporter) initarr() { imp.arr = C.get_arr() if imp.alloc == nil { imp.alloc = &importAllocator{arr: imp.arr} } } func (imp *cimporter) doImportArr(src *CArrowArray) error { imp.arr = C.get_arr() C.ArrowArrayMove(src, imp.arr) if imp.alloc == nil { imp.alloc = &importAllocator{arr: imp.arr} } // we tie the releasing of the array to when the buffers are // cleaned up, so if there are no buffers that we've imported // such as for a null array or a nested array with no bitmap // and only null columns, then we can release the CArrowArray // struct immediately after import, since we have no imported // memory that we have to track the lifetime of. defer func() { if imp.alloc.bufCount.Load() == 0 { C.ArrowArrayRelease(imp.arr) C.free(unsafe.Pointer(imp.arr)) } }() return imp.doImport() } // import is called recursively as needed for importing an array and its children // in order to generate array.Data objects func (imp *cimporter) doImport() error { // move the array from the src object passed in to the one referenced by // this importer. That way we can set up a finalizer on the created // arrow.ArrayData object so we clean up our Array's memory when garbage collected. defer func(arr *CArrowArray) { // this should only occur in the case of an error happening // during import, at which point we need to clean up the // ArrowArray struct we allocated. if imp.data == nil { C.free(unsafe.Pointer(arr)) } }(imp.arr) // import any children if err := imp.doImportChildren(); err != nil { return err } for _, c := range imp.children { if c.data != nil { defer c.data.Release() } } if imp.arr.n_buffers > 0 { // get a view of the buffers, zero-copy. we're just looking at the pointers imp.cbuffers = unsafe.Slice((**C.void)(unsafe.Pointer(imp.arr.buffers)), imp.arr.n_buffers) } // handle each of our type cases switch dt := imp.dt.(type) { case *arrow.NullType: if err := imp.checkNoChildren(); err != nil { return err } imp.data = array.NewData(dt, int(imp.arr.length), nil, nil, int(imp.arr.null_count), int(imp.arr.offset)) case arrow.FixedWidthDataType: return imp.importFixedSizePrimitive() case *arrow.StringType: return imp.importStringLike(int64(arrow.Int32SizeBytes)) case *arrow.BinaryType: return imp.importStringLike(int64(arrow.Int32SizeBytes)) case *arrow.LargeStringType: return imp.importStringLike(int64(arrow.Int64SizeBytes)) case *arrow.LargeBinaryType: return imp.importStringLike(int64(arrow.Int64SizeBytes)) case *arrow.StringViewType: return imp.importBinaryViewLike() case *arrow.BinaryViewType: return imp.importBinaryViewLike() case *arrow.ListType: return imp.importListLike() case *arrow.LargeListType: return imp.importListLike() case *arrow.ListViewType: return imp.importListViewLike() case *arrow.LargeListViewType: return imp.importListViewLike() case *arrow.MapType: return imp.importListLike() case *arrow.FixedSizeListType: if err := imp.checkNumChildren(1); err != nil { return err } if err := imp.checkNumBuffers(1); err != nil { return err } nulls, err := imp.importNullBitmap(0) if err != nil { return err } if nulls != nil { defer nulls.Release() } imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) case *arrow.StructType: if err := imp.checkNumBuffers(1); err != nil { return err } nulls, err := imp.importNullBitmap(0) if err != nil { return err } if nulls != nil { defer nulls.Release() } children := make([]arrow.ArrayData, len(imp.children)) for i := range imp.children { children[i] = imp.children[i].data } imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, children, int(imp.arr.null_count), int(imp.arr.offset)) case *arrow.RunEndEncodedType: if err := imp.checkNumBuffers(0); err != nil { return err } if len(imp.children) != 2 { return fmt.Errorf("%w: run-end encoded array should have 2 children", arrow.ErrInvalid) } children := []arrow.ArrayData{imp.children[0].data, imp.children[1].data} imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{}, children, int(imp.arr.null_count), int(imp.arr.offset)) case *arrow.DenseUnionType: if err := imp.checkNoNulls(); err != nil { return err } bufs := []*memory.Buffer{nil, nil, nil} var err error if imp.arr.n_buffers == 3 { // legacy format exported by older arrow c++ versions if bufs[1], err = imp.importFixedSizeBuffer(1, 1); err != nil { return err } defer bufs[1].Release() if bufs[2], err = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)); err != nil { return err } defer bufs[2].Release() } else { if err := imp.checkNumBuffers(2); err != nil { return err } if bufs[1], err = imp.importFixedSizeBuffer(0, 1); err != nil { return err } defer bufs[1].Release() if bufs[2], err = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)); err != nil { return err } defer bufs[2].Release() } children := make([]arrow.ArrayData, len(imp.children)) for i := range imp.children { children[i] = imp.children[i].data } imp.data = array.NewData(dt, int(imp.arr.length), bufs, children, 0, int(imp.arr.offset)) case *arrow.SparseUnionType: if err := imp.checkNoNulls(); err != nil { return err } var buf *memory.Buffer var err error if imp.arr.n_buffers == 2 { // legacy format exported by older Arrow C++ versions if buf, err = imp.importFixedSizeBuffer(1, 1); err != nil { return err } defer buf.Release() } else { if err := imp.checkNumBuffers(1); err != nil { return err } if buf, err = imp.importFixedSizeBuffer(0, 1); err != nil { return err } defer buf.Release() } children := make([]arrow.ArrayData, len(imp.children)) for i := range imp.children { children[i] = imp.children[i].data } imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nil, buf}, children, 0, int(imp.arr.offset)) default: return fmt.Errorf("unimplemented type %s", dt) } return nil } func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) { if err = imp.checkNoChildren(); err != nil { return } if err = imp.checkNumBuffers(3); err != nil { return } var nulls, offsets, values *memory.Buffer if nulls, err = imp.importNullBitmap(0); err != nil { return } if nulls != nil { defer nulls.Release() } if offsets, err = imp.importOffsetsBuffer(1, offsetByteWidth); err != nil { return } defer offsets.Release() var nvals int64 switch offsetByteWidth { case 4: typedOffsets := arrow.Int32Traits.CastFromBytes(offsets.Bytes()) nvals = int64(typedOffsets[imp.arr.offset+imp.arr.length]) case 8: typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes()) nvals = typedOffsets[imp.arr.offset+imp.arr.length] } if values, err = imp.importVariableValuesBuffer(2, 1, nvals); err != nil { return } defer values.Release() imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) return } func (imp *cimporter) importBinaryViewLike() (err error) { if err = imp.checkNoChildren(); err != nil { return } buffers := make([]*memory.Buffer, len(imp.cbuffers)-1) defer memory.ReleaseBuffers(buffers) if buffers[0], err = imp.importNullBitmap(0); err != nil { return } if buffers[1], err = imp.importFixedSizeBuffer(1, int64(arrow.ViewHeaderSizeBytes)); err != nil { return } dataBufferSizes := unsafe.Slice((*int64)(unsafe.Pointer(imp.cbuffers[len(buffers)])), len(buffers)-2) for i, size := range dataBufferSizes { if buffers[i+2], err = imp.importVariableValuesBuffer(i+2, 1, size); err != nil { return } } imp.data = array.NewData(imp.dt, int(imp.arr.length), buffers, nil, int(imp.arr.null_count), int(imp.arr.offset)) return } func (imp *cimporter) importListLike() (err error) { if err = imp.checkNumChildren(1); err != nil { return err } if err = imp.checkNumBuffers(2); err != nil { return err } var nulls, offsets *memory.Buffer if nulls, err = imp.importNullBitmap(0); err != nil { return } if nulls != nil { defer nulls.Release() } offsetSize := imp.dt.Layout().Buffers[1].ByteWidth if offsets, err = imp.importOffsetsBuffer(1, int64(offsetSize)); err != nil { return } if offsets != nil { defer offsets.Release() } imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) return } func (imp *cimporter) importListViewLike() (err error) { offsetSize := int64(imp.dt.Layout().Buffers[1].ByteWidth) if err = imp.checkNumChildren(1); err != nil { return err } if err = imp.checkNumBuffers(3); err != nil { return err } var nulls, offsets, sizes *memory.Buffer if nulls, err = imp.importNullBitmap(0); err != nil { return } if nulls != nil { defer nulls.Release() } if offsets, err = imp.importFixedSizeBuffer(1, offsetSize); err != nil { return } if offsets != nil { defer offsets.Release() } if sizes, err = imp.importFixedSizeBuffer(2, offsetSize); err != nil { return } if sizes != nil { defer sizes.Release() } imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, sizes}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) return } func (imp *cimporter) importFixedSizePrimitive() error { if err := imp.checkNoChildren(); err != nil { return err } if err := imp.checkNumBuffers(2); err != nil { return err } nulls, err := imp.importNullBitmap(0) if err != nil { return err } var values *memory.Buffer fw := imp.dt.(arrow.FixedWidthDataType) if bitutil.IsMultipleOf8(int64(fw.BitWidth())) { values, err = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth()))) } else { if fw.BitWidth() != 1 { return xerrors.New("invalid bitwidth") } values, err = imp.importBitsBuffer(1) } if err != nil { return err } var dict *array.Data if dt, ok := imp.dt.(*arrow.DictionaryType); ok { dictImp := &cimporter{dt: dt.ValueType} if err := dictImp.importChild(imp, imp.arr.dictionary); err != nil { return err } defer dictImp.data.Release() dict = dictImp.data.(*array.Data) } if nulls != nil { defer nulls.Release() } if values != nil { defer values.Release() } imp.data = array.NewDataWithDictionary(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, int(imp.arr.null_count), int(imp.arr.offset), dict) return nil } func (imp *cimporter) checkNoChildren() error { return imp.checkNumChildren(0) } func (imp *cimporter) checkNoNulls() error { if imp.arr.null_count != 0 { return fmt.Errorf("%w: unexpected non-zero null count for imported type %s", arrow.ErrInvalid, imp.dt) } return nil } func (imp *cimporter) checkNumChildren(n int64) error { if int64(imp.arr.n_children) != n { return fmt.Errorf("expected %d children, for imported type %s, ArrowArray has %d", n, imp.dt, imp.arr.n_children) } return nil } func (imp *cimporter) checkNumBuffers(n int64) error { if int64(imp.arr.n_buffers) != n { return fmt.Errorf("expected %d buffers for imported type %s, ArrowArray has %d", n, imp.dt, imp.arr.n_buffers) } return nil } func (imp *cimporter) importBuffer(bufferID int, sz int64) (*memory.Buffer, error) { // this is not a copy, we're just having a slice which points at the data // it's still owned by the C.ArrowArray object and its backing C++ object. if imp.cbuffers[bufferID] == nil { if sz != 0 { return nil, errors.New("invalid buffer") } return memory.NewBufferBytes([]byte{}), nil } data := unsafe.Slice((*byte)(unsafe.Pointer(imp.cbuffers[bufferID])), sz) imp.alloc.addBuffer() return memory.NewBufferWithAllocator(data, imp.alloc), nil } func (imp *cimporter) importBitsBuffer(bufferID int) (*memory.Buffer, error) { bufsize := bitutil.BytesForBits(int64(imp.arr.length) + int64(imp.arr.offset)) return imp.importBuffer(bufferID, bufsize) } func (imp *cimporter) importNullBitmap(bufferID int) (*memory.Buffer, error) { if imp.arr.null_count > 0 && imp.cbuffers[bufferID] == nil { return nil, fmt.Errorf("arrowarray struct has null bitmap buffer, but non-zero null_count %d", imp.arr.null_count) } if imp.arr.null_count == 0 && imp.cbuffers[bufferID] == nil { return nil, nil } return imp.importBitsBuffer(bufferID) } func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) (*memory.Buffer, error) { bufsize := byteWidth * int64(imp.arr.length+imp.arr.offset) return imp.importBuffer(bufferID, bufsize) } func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) (*memory.Buffer, error) { bufsize := offsetsize * int64((imp.arr.length + imp.arr.offset + 1)) return imp.importBuffer(bufferID, bufsize) } func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) (*memory.Buffer, error) { bufsize := byteWidth * nvals return imp.importBuffer(bufferID, int64(bufsize)) } func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, err error) { imp = &cimporter{dt: dt} err = imp.doImportArr(arr) return } func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) error { rdr.stream = C.get_stream() C.ArrowArrayStreamMove(stream, rdr.stream) rdr.arr = C.get_arr() runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) { if r.cur != nil { r.cur.Release() } C.ArrowArrayStreamRelease(r.stream) C.ArrowArrayRelease(r.arr) C.free(unsafe.Pointer(r.stream)) C.free(unsafe.Pointer(r.arr)) }) var sc CArrowSchema errno := C.stream_get_schema(rdr.stream, &sc) if errno != 0 { return rdr.getError(int(errno)) } defer C.ArrowSchemaRelease(&sc) s, err := ImportCArrowSchema((*CArrowSchema)(&sc)) if err != nil { return err } rdr.schema = s return nil } // Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface type nativeCRecordBatchReader struct { stream *CArrowArrayStream arr *CArrowArray schema *arrow.Schema cur arrow.Record err error } // No need to implement retain and release here as we used runtime.SetFinalizer when constructing // the reader to free up the ArrowArrayStream memory when the garbage collector cleans it up. func (n *nativeCRecordBatchReader) Retain() {} func (n *nativeCRecordBatchReader) Release() {} func (n *nativeCRecordBatchReader) Err() error { return n.err } func (n *nativeCRecordBatchReader) Record() arrow.Record { return n.cur } func (n *nativeCRecordBatchReader) Next() bool { err := n.next() switch { case err == nil: return true case err == io.EOF: return false } n.err = err return false } func (n *nativeCRecordBatchReader) next() error { if n.schema == nil { var sc CArrowSchema errno := C.stream_get_schema(n.stream, &sc) if errno != 0 { return n.getError(int(errno)) } defer C.ArrowSchemaRelease(&sc) s, err := ImportCArrowSchema((*CArrowSchema)(&sc)) if err != nil { return err } n.schema = s } if n.cur != nil { n.cur.Release() n.cur = nil } errno := C.stream_get_next(n.stream, n.arr) if errno != 0 { return n.getError(int(errno)) } if C.ArrowArrayIsReleased(n.arr) == 1 { return io.EOF } rec, err := ImportCRecordBatchWithSchema(n.arr, n.schema) if err != nil { return err } n.cur = rec return nil } func (n *nativeCRecordBatchReader) Schema() *arrow.Schema { return n.schema } func (n *nativeCRecordBatchReader) getError(errno int) error { return fmt.Errorf("%w: %s", syscall.Errno(errno), C.GoString(C.stream_get_last_error(n.stream))) } func (n *nativeCRecordBatchReader) Read() (arrow.Record, error) { if err := n.next(); err != nil { n.err = err return nil, err } return n.cur, nil } func releaseArr(arr *CArrowArray) { C.ArrowArrayRelease(arr) } func releaseSchema(schema *CArrowSchema) { C.ArrowSchemaRelease(schema) }