arrow/cdata/cdata_test_framework.go (335 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build test
// +build test
package cdata
// #include <stdlib.h>
// #include <stdint.h>
// #include <string.h>
// #include "abi.h"
// #include "helpers.h"
//
// void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out);
// static struct ArrowArray* get_test_arr() {
// struct ArrowArray* array = (struct ArrowArray*)malloc(sizeof(struct ArrowArray));
// memset(array, 0, sizeof(*array));
// return array;
// }
// static struct ArrowArrayStream* get_test_stream() {
// struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream));
// memset(out, 0, sizeof(struct ArrowArrayStream));
// return out;
// }
//
// static struct ArrowAsyncDeviceStreamHandler* get_test_async_handler() {
// struct ArrowAsyncDeviceStreamHandler* handler =
// (struct ArrowAsyncDeviceStreamHandler*)malloc(sizeof(struct ArrowAsyncDeviceStreamHandler));
// memset(handler, 0, sizeof(*handler));
// return handler;
// }
//
// void release_test_arr(struct ArrowArray* arr);
//
// static int32_t* get_data() {
// int32_t* data = malloc(sizeof(int32_t)*10);
// for (int i = 0; i < 10; ++i) { data[i] = i+1; }
// return data;
// }
// void export_int32_type(struct ArrowSchema* schema);
// void export_int32_array(const int32_t*, int64_t, struct ArrowArray*);
// int test1_is_released();
// void test_primitive(struct ArrowSchema* schema, const char* fmt);
// void free_malloced_schemas(struct ArrowSchema**);
// struct ArrowSchema** test_lists(const char** fmts, const char** names, const int* nullflags, const int n);
// struct ArrowSchema** test_struct(const char** fmts, const char** names, int64_t* flags, const int n);
// struct ArrowSchema** test_map(const char** fmts, const char** names, int64_t* flags, const int n);
// struct ArrowSchema** test_schema(const char** fmts, const char** names, int64_t* flags, const int n);
// struct ArrowSchema** test_union(const char** fmts, const char** names, int64_t* flags, const int n);
// int test_exported_stream(struct ArrowArrayStream* stream);
// void test_stream_schema_fallible(struct ArrowArrayStream* stream);
// int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed);
// extern void releaseTestArr(struct ArrowArray* array);
// extern void goReleaseTestArray(struct ArrowArray* array);
import "C"
import (
"errors"
"fmt"
"io"
"math/rand"
"runtime/cgo"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/internal"
"github.com/apache/arrow-go/v18/arrow/memory/mallocator"
)
const (
flagIsNullable = C.ARROW_FLAG_NULLABLE
flagMapKeysSorted = C.ARROW_FLAG_MAP_KEYS_SORTED
)
var (
metadata1 = arrow.NewMetadata([]string{"key1", "key2"}, []string{"", "bar"})
metadata2 = arrow.NewMetadata([]string{"key"}, []string{"abcde"})
)
func exportInt32TypeSchema() CArrowSchema {
var s CArrowSchema
C.export_int32_type(&s)
return s
}
func releaseStream(s *CArrowArrayStream) {
C.ArrowArrayStreamRelease(s)
}
func schemaIsReleased(s *CArrowSchema) bool {
return C.ArrowSchemaIsReleased(s) == 1
}
func getMetadataKeys() ([]string, []string) {
return []string{"key1", "key2"}, []string{"key"}
}
func getMetadataValues() ([]string, []string) {
return []string{"", "bar"}, []string{"abcde"}
}
func exportInt32Array() *CArrowArray {
arr := C.get_test_arr()
C.export_int32_array(C.get_data(), C.int64_t(10), arr)
return arr
}
func isReleased(arr *CArrowArray) bool {
return C.ArrowArrayIsReleased(arr) == 1
}
func test1IsReleased() bool {
return C.test1_is_released() == 1
}
func testPrimitive(fmtstr string) CArrowSchema {
var s CArrowSchema
fmt := C.CString(fmtstr)
C.test_primitive(&s, fmt)
return s
}
func freeMallocedSchemas(schemas **CArrowSchema) {
C.free_malloced_schemas(schemas)
}
func testAsyncHandler() *CArrowAsyncDeviceStreamHandler {
return C.get_test_async_handler()
}
func freeAsyncHandler(h *CArrowAsyncDeviceStreamHandler) {
C.free(unsafe.Pointer(h))
}
func testNested(fmts, names []string, isnull []bool) **CArrowSchema {
if len(fmts) != len(names) {
panic("testing nested lists must have same size fmts and names")
}
cfmts := make([]*C.char, len(fmts))
cnames := make([]*C.char, len(names))
nulls := make([]C.int, len(isnull))
for i := range fmts {
cfmts[i] = C.CString(fmts[i])
cnames[i] = C.CString(names[i])
}
for i, v := range isnull {
if v {
nulls[i] = C.ARROW_FLAG_NULLABLE
} else {
nulls[i] = 0
}
}
return C.test_lists((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int)(unsafe.Pointer(&nulls[0])), C.int(len(fmts)))
}
func testStruct(fmts, names []string, flags []int64) **CArrowSchema {
if len(fmts) != len(names) || len(names) != len(flags) {
panic("testing structs must all have the same size slices in args")
}
cfmts := make([]*C.char, len(fmts))
cnames := make([]*C.char, len(names))
cflags := make([]C.int64_t, len(flags))
for i := range fmts {
cfmts[i] = C.CString(fmts[i])
cnames[i] = C.CString(names[i])
cflags[i] = C.int64_t(flags[i])
}
return C.test_struct((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int64_t)(unsafe.Pointer(&cflags[0])), C.int(len(fmts)))
}
func testMap(fmts, names []string, flags []int64) **CArrowSchema {
if len(fmts) != len(names) || len(names) != len(flags) {
panic("testing maps must all have the same size slices in args")
}
cfmts := make([]*C.char, len(fmts))
cnames := make([]*C.char, len(names))
cflags := make([]C.int64_t, len(flags))
for i := range fmts {
cfmts[i] = C.CString(fmts[i])
cnames[i] = C.CString(names[i])
cflags[i] = C.int64_t(flags[i])
}
return C.test_map((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int64_t)(unsafe.Pointer(&cflags[0])), C.int(len(fmts)))
}
func testUnion(fmts, names []string, flags []int64) **CArrowSchema {
if len(fmts) != len(names) || len(names) != len(flags) {
panic("testing unions must all have the same size slices in args")
}
cfmts := make([]*C.char, len(fmts))
cnames := make([]*C.char, len(names))
cflags := make([]C.int64_t, len(flags))
for i := range fmts {
cfmts[i] = C.CString(fmts[i])
cnames[i] = C.CString(names[i])
cflags[i] = C.int64_t(flags[i])
}
return C.test_union((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int64_t)(unsafe.Pointer(&cflags[0])), C.int(len(fmts)))
}
func testSchema(fmts, names []string, flags []int64) **CArrowSchema {
if len(fmts) != len(names) || len(names) != len(flags) {
panic("testing structs must all have the same size slices in args")
}
cfmts := make([]*C.char, len(fmts))
cnames := make([]*C.char, len(names))
cflags := make([]C.int64_t, len(flags))
for i := range fmts {
cfmts[i] = C.CString(fmts[i])
cnames[i] = C.CString(names[i])
cflags[i] = C.int64_t(flags[i])
}
return C.test_schema((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int64_t)(unsafe.Pointer(&cflags[0])), C.int(len(fmts)))
}
func freeAny[T any](alloc *mallocator.Mallocator, p *T, n int) {
raw := unsafe.Slice((*byte)(unsafe.Pointer(p)), int(unsafe.Sizeof(*p))*n)
alloc.Free(raw)
}
func freeTestMallocatorArr(carr *CArrowArray, alloc *mallocator.Mallocator) {
freeAny(alloc, carr, 1)
}
func getTestArr(alloc *mallocator.Mallocator) *CArrowArray {
raw := alloc.Allocate(C.sizeof_struct_ArrowArray)
return (*CArrowArray)(unsafe.Pointer(&raw[0]))
}
type testReleaser struct {
alloc *mallocator.Mallocator
bufs [][]byte
}
//export releaseTestArr
func releaseTestArr(arr *CArrowArray) {
if C.ArrowArrayIsReleased(arr) == 1 {
return
}
defer C.ArrowArrayMarkReleased(arr)
h := getHandle(arr.private_data)
tr := h.Value().(*testReleaser)
alloc := tr.alloc
for _, b := range tr.bufs {
alloc.Free(b)
}
if arr.n_buffers > 0 {
freeAny(alloc, arr.buffers, int(arr.n_buffers))
}
if arr.dictionary != nil {
C.ArrowArrayRelease(arr.dictionary)
freeAny(alloc, arr.dictionary, 1)
}
if arr.n_children > 0 {
children := unsafe.Slice(arr.children, arr.n_children)
for _, c := range children {
C.ArrowArrayRelease(c)
freeTestMallocatorArr(c, alloc)
}
freeAny(alloc, arr.children, int(arr.n_children))
}
h.Delete()
C.free(unsafe.Pointer(arr.private_data))
}
func allocateBufferMallocatorPtrArr(alloc *mallocator.Mallocator, n int) []*C.void {
raw := alloc.Allocate(int(unsafe.Sizeof((*C.void)(nil))) * n)
return unsafe.Slice((**C.void)(unsafe.Pointer(&raw[0])), n)
}
func allocateChildrenPtrArr(alloc *mallocator.Mallocator, n int) []*CArrowArray {
raw := alloc.Allocate(int(unsafe.Sizeof((*CArrowArray)(nil))) * n)
return unsafe.Slice((**CArrowArray)(unsafe.Pointer(&raw[0])), n)
}
func createCArr(arr arrow.Array, alloc *mallocator.Mallocator) *CArrowArray {
var (
carr = getTestArr(alloc)
children = (**CArrowArray)(nil)
nchildren = C.int64_t(0)
)
switch arr := arr.(type) {
case array.ListLike:
clist := allocateChildrenPtrArr(alloc, 1)
clist[0] = createCArr(arr.ListValues(), alloc)
children = (**CArrowArray)(unsafe.Pointer(&clist[0]))
nchildren += 1
case *array.Struct:
if arr.NumField() == 0 {
break
}
clist := allocateChildrenPtrArr(alloc, arr.NumField())
for i := 0; i < arr.NumField(); i++ {
clist[i] = createCArr(arr.Field(i), alloc)
nchildren += 1
}
children = (**CArrowArray)(unsafe.Pointer(&clist[0]))
case *array.RunEndEncoded:
clist := allocateChildrenPtrArr(alloc, 2)
clist[0] = createCArr(arr.RunEndsArr(), alloc)
clist[1] = createCArr(arr.Values(), alloc)
children = (**CArrowArray)(unsafe.Pointer(&clist[0]))
nchildren += 2
case array.Union:
if arr.NumFields() == 0 {
break
}
clist := allocateChildrenPtrArr(alloc, arr.NumFields())
for i := 0; i < arr.NumFields(); i++ {
clist[i] = createCArr(arr.Field(i), alloc)
nchildren += 1
}
children = (**CArrowArray)(unsafe.Pointer(&clist[0]))
}
carr.children = children
carr.n_children = nchildren
carr.dictionary = nil
carr.length = C.int64_t(arr.Len())
carr.null_count = C.int64_t(arr.NullN())
carr.offset = C.int64_t(arr.Data().Offset())
carr.release = (*[0]byte)(C.goReleaseTestArray)
tr := &testReleaser{alloc: alloc}
h := cgo.NewHandle(tr)
carr.private_data = createHandle(h)
buffers := arr.Data().Buffers()
bufOffset, nbuffers := 0, len(buffers)
hasValidityBitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID())
if nbuffers > 0 && !hasValidityBitmap {
nbuffers--
bufOffset++
}
if nbuffers == 0 {
return carr
}
tr.bufs = make([][]byte, 0, nbuffers)
cbufs := allocateBufferMallocatorPtrArr(alloc, nbuffers)
for i, b := range buffers[bufOffset:] {
if b != nil && b.Len() > 0 {
raw := alloc.Allocate(b.Len())
copy(raw, b.Bytes())
tr.bufs = append(tr.bufs, raw)
cbufs[i] = (*C.void)(unsafe.Pointer(&raw[0]))
} else {
cbufs[i] = nil
}
}
carr.n_buffers = C.int64_t(len(cbufs))
if len(cbufs) > 0 {
carr.buffers = (*unsafe.Pointer)(unsafe.Pointer(&cbufs[0]))
}
return carr
}
func createTestStreamObj() *CArrowArrayStream {
return C.get_test_stream()
}
func arrayStreamTest() *CArrowArrayStream {
st := C.get_test_stream()
C.setup_array_stream_test(2, st)
return st
}
func exportedStreamTest(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
rc := C.test_exported_stream(out)
C.free(unsafe.Pointer(out))
if rc == 0 {
return nil
}
return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
}
func roundTripStreamTest(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
rdr, err := ImportCRecordReader(out, nil)
if err != nil {
return err
}
for {
_, err = rdr.Read()
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
}
}
return nil
}
func fallibleSchemaTestDeprecated() (err error) {
stream := CArrowArrayStream{}
C.test_stream_schema_fallible(&stream)
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Panicked: %#v", r)
}
}()
_ = ImportCArrayStream(&stream, nil)
return nil
}
func fallibleSchemaTest() error {
stream := CArrowArrayStream{}
C.test_stream_schema_fallible(&stream)
_, err := ImportCRecordReader(&stream, nil)
if err != nil {
return err
}
return nil
}
func confuseGoGc(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
rc := C.confuse_go_gc(out, C.uint(rand.Int()))
C.free(unsafe.Pointer(out))
if rc == 0 {
return nil
}
return fmt.Errorf("Exported stream test failed with return code %d", int(rc))
}