arrow/cdata/cdata.go (437 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build cgo
// +build cgo
package cdata
// implement handling of the Arrow C Data Interface. At least from a consuming side.
// #include "arrow/c/abi.h"
// #include "arrow/c/helpers.h"
// #include <stdlib.h>
// int stream_get_schema(struct ArrowArrayStream* st, struct ArrowSchema* out) { return st->get_schema(st, out); }
// int stream_get_next(struct ArrowArrayStream* st, struct ArrowArray* out) { return st->get_next(st, out); }
// const char* stream_get_last_error(struct ArrowArrayStream* st) { return st->get_last_error(st); }
// struct ArrowArray* get_arr() { return (struct ArrowArray*)(malloc(sizeof(struct ArrowArray))); }
// struct ArrowArrayStream* get_stream() { return (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); }
//
import "C"
import (
"io"
"reflect"
"runtime"
"strconv"
"strings"
"syscall"
"unsafe"
"github.com/aliyun/aliyun-odps-go-sdk/arrow"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/array"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/bitutil"
"github.com/aliyun/aliyun-odps-go-sdk/arrow/memory"
"golang.org/x/xerrors"
)
type (
// CArrowSchema is the C Data Interface for ArrowSchemas defined in abi.h
CArrowSchema = C.struct_ArrowSchema
// CArrowArray is the C Data Interface object for Arrow Arrays as defined in abi.h
CArrowArray = C.struct_ArrowArray
// CArrowArrayStream is the Experimental API for handling streams of record batches
// through the C Data interface.
CArrowArrayStream = C.struct_ArrowArrayStream
)
// Map from the defined strings to their corresponding arrow.DataType interface
// object instances, for types that don't require params.
var formatToSimpleType = map[string]arrow.DataType{
"n": arrow.Null,
"b": arrow.FixedWidthTypes.Boolean,
"c": arrow.PrimitiveTypes.Int8,
"C": arrow.PrimitiveTypes.Uint8,
"s": arrow.PrimitiveTypes.Int16,
"S": arrow.PrimitiveTypes.Uint16,
"i": arrow.PrimitiveTypes.Int32,
"I": arrow.PrimitiveTypes.Uint32,
"l": arrow.PrimitiveTypes.Int64,
"L": arrow.PrimitiveTypes.Uint64,
"e": arrow.FixedWidthTypes.Float16,
"f": arrow.PrimitiveTypes.Float32,
"g": arrow.PrimitiveTypes.Float64,
"z": arrow.BinaryTypes.Binary,
"u": arrow.BinaryTypes.String,
"tdD": arrow.FixedWidthTypes.Date32,
"tdm": arrow.FixedWidthTypes.Date64,
"tts": arrow.FixedWidthTypes.Time32s,
"ttm": arrow.FixedWidthTypes.Time32ms,
"ttu": arrow.FixedWidthTypes.Time64us,
"ttn": arrow.FixedWidthTypes.Time64ns,
"tDs": arrow.FixedWidthTypes.Duration_s,
"tDm": arrow.FixedWidthTypes.Duration_ms,
"tDu": arrow.FixedWidthTypes.Duration_us,
"tDn": arrow.FixedWidthTypes.Duration_ns,
"tiM": arrow.FixedWidthTypes.MonthInterval,
"tiD": arrow.FixedWidthTypes.DayTimeInterval,
"tin": arrow.FixedWidthTypes.MonthDayNanoInterval,
}
// decode metadata from C which is encoded as
//
// [int32] -> number of metadata pairs
// for 0..n
// [int32] -> number of bytes in key
// [n bytes] -> key value
// [int32] -> number of bytes in value
// [n bytes] -> value
func decodeCMetadata(md *C.char) arrow.Metadata {
if md == nil {
return arrow.Metadata{}
}
// don't copy the bytes, just reference them directly
const maxlen = 0x7fffffff
data := (*[maxlen]byte)(unsafe.Pointer(md))[:]
readint32 := func() int32 {
v := *(*int32)(unsafe.Pointer(&data[0]))
data = data[arrow.Int32SizeBytes:]
return v
}
readstr := func() string {
l := readint32()
s := string(data[:l])
data = data[l:]
return s
}
npairs := readint32()
if npairs == 0 {
return arrow.Metadata{}
}
keys := make([]string, npairs)
vals := make([]string, npairs)
for i := int32(0); i < npairs; i++ {
keys[i] = readstr()
vals[i] = readstr()
}
return arrow.NewMetadata(keys, vals)
}
// convert a C.ArrowSchema to an arrow.Field to maintain metadata with the schema
func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) {
// always release, even on error
defer C.ArrowSchemaRelease(schema)
var childFields []arrow.Field
if schema.n_children > 0 {
// call ourselves recursively if there are children.
var schemaChildren []*CArrowSchema
// set up a slice to reference safely
s := (*reflect.SliceHeader)(unsafe.Pointer(&schemaChildren))
s.Data = uintptr(unsafe.Pointer(schema.children))
s.Len = int(schema.n_children)
s.Cap = int(schema.n_children)
childFields = make([]arrow.Field, schema.n_children)
for i, c := range schemaChildren {
childFields[i], err = importSchema((*CArrowSchema)(c))
if err != nil {
return
}
}
}
// copy the schema name from the c-string
ret.Name = C.GoString(schema.name)
ret.Nullable = (schema.flags & C.ARROW_FLAG_NULLABLE) != 0
ret.Metadata = decodeCMetadata(schema.metadata)
// copies the c-string here, but it's very small
f := C.GoString(schema.format)
// handle our non-parameterized simple types.
dt, ok := formatToSimpleType[f]
if ok {
ret.Type = dt
return
}
// handle types with params via colon
typs := strings.Split(f, ":")
defaulttz := "UTC"
switch typs[0] {
case "tss":
tz := typs[1]
if len(typs[1]) == 0 {
tz = defaulttz
}
dt = &arrow.TimestampType{Unit: arrow.Second, TimeZone: tz}
case "tsm":
tz := typs[1]
if len(typs[1]) == 0 {
tz = defaulttz
}
dt = &arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: tz}
case "tsu":
tz := typs[1]
if len(typs[1]) == 0 {
tz = defaulttz
}
dt = &arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: tz}
case "tsn":
tz := typs[1]
if len(typs[1]) == 0 {
tz = defaulttz
}
dt = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: tz}
case "w": // fixed size binary is "w:##" where ## is the byteWidth
byteWidth, err := strconv.Atoi(typs[1])
if err != nil {
return ret, err
}
dt = &arrow.FixedSizeBinaryType{ByteWidth: byteWidth}
case "d": // decimal types are d:<precision>,<scale>[,<bitsize>] size is assumed 128 if left out
props := typs[1]
propList := strings.Split(props, ",")
if len(propList) == 3 {
err = xerrors.New("only decimal128 is supported")
return
}
precision, _ := strconv.Atoi(propList[0])
scale, _ := strconv.Atoi(propList[1])
dt = &arrow.Decimal128Type{Precision: int32(precision), Scale: int32(scale)}
}
if f[0] == '+' { // types with children
switch f[1] {
case 'l': // list
dt = arrow.ListOfField(childFields[0])
case 'w': // fixed size list is w:# where # is the list size.
listSize, err := strconv.Atoi(strings.Split(f, ":")[1])
if err != nil {
return ret, err
}
dt = arrow.FixedSizeListOfField(int32(listSize), childFields[0])
case 's': // struct
dt = arrow.StructOf(childFields...)
case 'm': // map type is basically a list of structs.
st := childFields[0].Type.(*arrow.StructType)
dt = arrow.MapOf(st.Field(0).Type, st.Field(1).Type)
dt.(*arrow.MapType).KeysSorted = (schema.flags & C.ARROW_FLAG_MAP_KEYS_SORTED) != 0
}
}
if dt == nil {
// if we didn't find a type, then it's something we haven't implemented.
err = xerrors.New("unimplemented type")
} else {
ret.Type = dt
}
return
}
// importer to keep track when importing C ArrowArray objects.
type cimporter struct {
dt arrow.DataType
arr *CArrowArray
data *array.Data
parent *cimporter
children []cimporter
cbuffers []*C.void
}
func (imp *cimporter) importChild(parent *cimporter, src *CArrowArray) error {
imp.parent = parent
return imp.doImport(src)
}
// import any child arrays for lists, structs, and so on.
func (imp *cimporter) doImportChildren() error {
var children []*CArrowArray
// create a proper slice for our children
s := (*reflect.SliceHeader)(unsafe.Pointer(&children))
s.Data = uintptr(unsafe.Pointer(imp.arr.children))
s.Len = int(imp.arr.n_children)
s.Cap = int(imp.arr.n_children)
if len(children) > 0 {
imp.children = make([]cimporter, len(children))
}
// handle the cases
switch imp.dt.ID() {
case arrow.LIST: // only one child to import
imp.children[0].dt = imp.dt.(*arrow.ListType).Elem()
if err := imp.children[0].importChild(imp, children[0]); err != nil {
return err
}
case arrow.FIXED_SIZE_LIST: // only one child to import
imp.children[0].dt = imp.dt.(*arrow.FixedSizeListType).Elem()
if err := imp.children[0].importChild(imp, children[0]); err != nil {
return err
}
case arrow.STRUCT: // import all the children
st := imp.dt.(*arrow.StructType)
for i, c := range children {
imp.children[i].dt = st.Field(i).Type
imp.children[i].importChild(imp, c)
}
case arrow.MAP: // only one child to import, it's a struct array
imp.children[0].dt = imp.dt.(*arrow.MapType).ValueType()
if err := imp.children[0].importChild(imp, children[0]); err != nil {
return err
}
}
return nil
}
func (imp *cimporter) initarr() {
imp.arr = C.get_arr()
}
// import is called recursively as needed for importing an array and its children
// in order to generate array.Data objects
func (imp *cimporter) doImport(src *CArrowArray) error {
imp.initarr()
// move the array from the src object passed in to the one referenced by
// this importer. That way we can set up a finalizer on the created
// *array.Data object so we clean up our Array's memory when garbage collected.
C.ArrowArrayMove(src, imp.arr)
defer func(arr *CArrowArray) {
if imp.data != nil {
runtime.SetFinalizer(imp.data, func(*array.Data) {
defer C.free(unsafe.Pointer(arr))
C.ArrowArrayRelease(arr)
if C.ArrowArrayIsReleased(arr) != 1 {
panic("did not release C mem")
}
})
} else {
C.free(unsafe.Pointer(arr))
}
}(imp.arr)
// import any children
if err := imp.doImportChildren(); err != nil {
return err
}
if imp.arr.n_buffers > 0 {
// get a view of the buffers, zero-copy. we're just looking at the pointers
const maxlen = 0x7fffffff
imp.cbuffers = (*[maxlen]*C.void)(unsafe.Pointer(imp.arr.buffers))[:imp.arr.n_buffers:imp.arr.n_buffers]
}
// handle each of our type cases
switch dt := imp.dt.(type) {
case *arrow.NullType:
if err := imp.checkNoChildren(); err != nil {
return err
}
imp.data = array.NewData(dt, int(imp.arr.length), nil, nil, int(imp.arr.null_count), int(imp.arr.offset))
case arrow.FixedWidthDataType:
return imp.importFixedSizePrimitive()
case *arrow.StringType:
return imp.importStringLike()
case *arrow.BinaryType:
return imp.importStringLike()
case *arrow.ListType:
return imp.importListLike()
case *arrow.MapType:
return imp.importListLike()
case *arrow.FixedSizeListType:
if err := imp.checkNumChildren(1); err != nil {
return err
}
if err := imp.checkNumBuffers(1); err != nil {
return err
}
nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
}
imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, []*array.Data{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset))
case *arrow.StructType:
if err := imp.checkNumBuffers(1); err != nil {
return err
}
nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
}
children := make([]*array.Data, len(imp.children))
for i := range imp.children {
children[i] = imp.children[i].data
}
imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, children, int(imp.arr.null_count), int(imp.arr.offset))
default:
return xerrors.Errorf("unimplemented type %s", dt)
}
return nil
}
func (imp *cimporter) importStringLike() error {
if err := imp.checkNoChildren(); err != nil {
return err
}
if err := imp.checkNumBuffers(3); err != nil {
return err
}
nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
}
offsets := imp.importOffsetsBuffer(1)
values := imp.importVariableValuesBuffer(2, 1, arrow.Int32Traits.CastFromBytes(offsets.Bytes()))
imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset))
return nil
}
func (imp *cimporter) importListLike() error {
if err := imp.checkNumChildren(1); err != nil {
return err
}
if err := imp.checkNumBuffers(2); err != nil {
return err
}
nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
}
offsets := imp.importOffsetsBuffer(1)
imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []*array.Data{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset))
return nil
}
func (imp *cimporter) importFixedSizePrimitive() error {
if err := imp.checkNoChildren(); err != nil {
return err
}
if err := imp.checkNumBuffers(2); err != nil {
return err
}
nulls, err := imp.importNullBitmap(0)
if err != nil {
return err
}
var values *memory.Buffer
fw := imp.dt.(arrow.FixedWidthDataType)
if bitutil.IsMultipleOf8(int64(fw.BitWidth())) {
values = imp.importFixedSizeBuffer(1, bitutil.BytesForBits(int64(fw.BitWidth())))
} else {
if fw.BitWidth() != 1 {
return xerrors.New("invalid bitwidth")
}
values = imp.importBitsBuffer(1)
}
imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, nil, int(imp.arr.null_count), int(imp.arr.offset))
return nil
}
func (imp *cimporter) checkNoChildren() error { return imp.checkNumChildren(0) }
func (imp *cimporter) checkNumChildren(n int64) error {
if int64(imp.arr.n_children) != n {
return xerrors.Errorf("expected %d children, for imported type %s, ArrowArray has %d", n, imp.dt, imp.arr.n_children)
}
return nil
}
func (imp *cimporter) checkNumBuffers(n int64) error {
if int64(imp.arr.n_buffers) != n {
return xerrors.Errorf("expected %d buffers for imported type %s, ArrowArray has %d", n, imp.dt, imp.arr.n_buffers)
}
return nil
}
func (imp *cimporter) importBuffer(bufferID int, sz int64) *memory.Buffer {
// this is not a copy, we're just having a slice which points at the data
// it's still owned by the C.ArrowArray object and its backing C++ object.
const maxLen = 0x7fffffff
data := (*[maxLen]byte)(unsafe.Pointer(imp.cbuffers[bufferID]))[:sz:sz]
return memory.NewBufferBytes(data)
}
func (imp *cimporter) importBitsBuffer(bufferID int) *memory.Buffer {
bufsize := bitutil.BytesForBits(int64(imp.arr.length) + int64(imp.arr.offset))
return imp.importBuffer(bufferID, bufsize)
}
func (imp *cimporter) importNullBitmap(bufferID int) (*memory.Buffer, error) {
if imp.arr.null_count > 0 && imp.cbuffers[bufferID] == nil {
return nil, xerrors.Errorf("arrowarray struct has null bitmap buffer, but non-zero null_count %d", imp.arr.null_count)
}
if imp.arr.null_count == 0 && imp.cbuffers[bufferID] == nil {
return nil, nil
}
return imp.importBitsBuffer(bufferID), nil
}
func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) *memory.Buffer {
bufsize := byteWidth * int64(imp.arr.length+imp.arr.offset)
return imp.importBuffer(bufferID, bufsize)
}
func (imp *cimporter) importOffsetsBuffer(bufferID int) *memory.Buffer {
const offsetsize = int64(arrow.Int32SizeBytes) // go doesn't implement int64 offsets yet
bufsize := offsetsize * int64((imp.arr.length + imp.arr.offset + 1))
return imp.importBuffer(bufferID, bufsize)
}
func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth int, offsets []int32) *memory.Buffer {
bufsize := byteWidth * int(offsets[imp.arr.length])
return imp.importBuffer(bufferID, int64(bufsize))
}
func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, err error) {
imp = &cimporter{dt: dt}
err = imp.doImport(arr)
return
}
func initReader(rdr *nativeCRecordBatchReader, stream *CArrowArrayStream) {
rdr.stream = C.get_stream()
C.ArrowArrayStreamMove(stream, rdr.stream)
runtime.SetFinalizer(rdr, func(r *nativeCRecordBatchReader) {
C.ArrowArrayStreamRelease(r.stream)
C.free(unsafe.Pointer(r.stream))
})
}
// Record Batch reader that conforms to arrio.Reader for the ArrowArrayStream interface
type nativeCRecordBatchReader struct {
stream *CArrowArrayStream
schema *arrow.Schema
}
func (n *nativeCRecordBatchReader) getError(errno int) error {
return xerrors.Errorf("%w: %s", syscall.Errno(errno), C.GoString(C.stream_get_last_error(n.stream)))
}
func (n *nativeCRecordBatchReader) Read() (array.Record, error) {
if n.schema == nil {
var sc CArrowSchema
errno := C.stream_get_schema(n.stream, &sc)
if errno != 0 {
return nil, n.getError(int(errno))
}
defer C.ArrowSchemaRelease(&sc)
s, err := ImportCArrowSchema((*CArrowSchema)(&sc))
if err != nil {
return nil, err
}
n.schema = s
}
arr := C.get_arr()
defer C.free(unsafe.Pointer(arr))
errno := C.stream_get_next(n.stream, arr)
if errno != 0 {
return nil, n.getError(int(errno))
}
if C.ArrowArrayIsReleased(arr) == 1 {
return nil, io.EOF
}
return ImportCRecordBatchWithSchema(arr, n.schema)
}