go/adbc/pkg/flightsql/driver.go (1,529 lines of code) (raw):
// Code generated by _tmpl/driver.go.tmpl. DO NOT EDIT.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//go:build driverlib
package main
// ADBC_EXPORTING is required on Windows, or else the symbols
// won't be accessible to the driver manager
// #cgo CFLAGS: -DADBC_EXPORTING
// #cgo CXXFLAGS: -std=c++11 -DADBC_EXPORTING
// #include "../../drivermgr/arrow-adbc/adbc.h"
// #include "utils.h"
// #include <errno.h>
// #include <stdint.h>
// #include <string.h>
//
// typedef const char cchar_t;
// typedef const uint8_t cuint8_t;
// typedef const uint32_t cuint32_t;
// typedef const struct AdbcError ConstAdbcError;
//
// int FlightSQLArrayStreamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*);
// int FlightSQLArrayStreamGetNext(struct ArrowArrayStream*, struct ArrowArray*);
// const char* FlightSQLArrayStreamGetLastError(struct ArrowArrayStream*);
// void FlightSQLArrayStreamRelease(struct ArrowArrayStream*);
//
// int FlightSQLArrayStreamGetSchemaTrampoline(struct ArrowArrayStream*, struct ArrowSchema*);
// int FlightSQLArrayStreamGetNextTrampoline(struct ArrowArrayStream*, struct ArrowArray*);
//
// void releasePartitions(struct AdbcPartitions* partitions);
//
import "C"
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"runtime"
"runtime/cgo"
"strings"
"sync/atomic"
"unsafe"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/cdata"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/arrow/memory/mallocator"
)
// Must use malloc() to respect CGO rules
var drv = flightsql.NewDriver(mallocator.NewMallocator())
// Flag set if any method panic()ed - afterwards all calls to driver will fail
// since internal state of driver is unknown
var globalPoison atomic.Bool
const errPrefix = "[FlightSQL] "
const logLevelEnvVar = "ADBC_DRIVER_FLIGHTSQL_LOG_LEVEL"
func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) {
if err == nil {
return
}
if err.release != nil {
C.FlightSQLerrRelease(err)
}
msg := errPrefix + fmt.Sprintf(format, vals...)
err.message = C.CString(msg)
err.release = (*[0]byte)(C.FlightSQL_release_error)
}
func setErrWithDetails(err *C.struct_AdbcError, adbcError adbc.Error) {
if err == nil {
return
}
if err.vendor_code != C.ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA {
setErr(err, adbcError.Msg)
return
}
cErrPtr := C.calloc(C.sizeof_struct_FlightSQLError, C.size_t(1))
cErr := (*C.struct_FlightSQLError)(cErrPtr)
cErr.message = C.CString(adbcError.Msg)
err.message = cErr.message
err.release = (*[0]byte)(C.FlightSQLReleaseErrWithDetails)
err.private_data = cErrPtr
numDetails := len(adbcError.Details)
if numDetails > 0 {
cErr.keys = (**C.cchar_t)(C.calloc(C.size_t(numDetails), C.size_t(unsafe.Sizeof((*C.cchar_t)(nil)))))
cErr.values = (**C.cuint8_t)(C.calloc(C.size_t(numDetails), C.size_t(unsafe.Sizeof((*C.cuint8_t)(nil)))))
cErr.lengths = (*C.size_t)(C.calloc(C.size_t(numDetails), C.sizeof_size_t))
keys := fromCArr[*C.cchar_t](cErr.keys, numDetails)
values := fromCArr[*C.cuint8_t](cErr.values, numDetails)
lengths := fromCArr[C.size_t](cErr.lengths, numDetails)
for i, detail := range adbcError.Details {
keys[i] = C.CString(detail.Key())
bytes, err := detail.Serialize()
if err != nil {
msg := err.Error()
values[i] = (*C.cuint8_t)(unsafe.Pointer(C.CString(msg)))
lengths[i] = C.size_t(len(msg))
} else {
values[i] = (*C.cuint8_t)(C.malloc(C.size_t(len(bytes))))
sink := fromCArr[byte]((*byte)(values[i]), len(bytes))
copy(sink, bytes)
lengths[i] = C.size_t(len(bytes))
}
}
} else {
cErr.keys = nil
cErr.values = nil
cErr.lengths = nil
}
cErr.count = C.int(numDetails)
}
func errToAdbcErr(adbcerr *C.struct_AdbcError, err error) adbc.Status {
if err == nil {
return adbc.StatusOK
}
var adbcError adbc.Error
if errors.As(err, &adbcError) {
setErrWithDetails(adbcerr, adbcError)
return adbcError.Code
}
setErr(adbcerr, err.Error())
return adbc.StatusUnknown
}
// We panicked; make all API functions error and dump stack traces
func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCode {
if !globalPoison.Swap(true) {
// Only print stack traces on the first occurrence
buf := make([]byte, 1<<20)
length := runtime.Stack(buf, true)
fmt.Fprintf(os.Stderr, "FlightSQL driver panicked, stack traces:\n%s", buf[:length])
}
setErr(err, "%s: Go panic in FlightSQL driver (see stderr): %#v", fname, e)
return C.ADBC_STATUS_INTERNAL
}
// Check environment variables and enable logging if possible.
func initLoggingFromEnv(db adbc.Database) {
logLevel := slog.LevelError
switch strings.ToLower(os.Getenv(logLevelEnvVar)) {
case "debug":
logLevel = slog.LevelDebug
case "info":
logLevel = slog.LevelInfo
case "warn":
case "warning":
logLevel = slog.LevelWarn
case "error":
logLevel = slog.LevelError
case "":
return
default:
printLoggingHelp()
return
}
h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
AddSource: false,
Level: logLevel,
})
logger := slog.New(h)
ext, ok := db.(adbc.DatabaseLogging)
if !ok {
logger.Error("FlightSQL does not support logging")
return
}
ext.SetLogger(logger)
}
func printLoggingHelp() {
fmt.Fprintf(os.Stderr, "FlightSQL: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar)
}
// Allocate a new cgo.Handle and store its address in a heap-allocated
// uintptr_t. Experimentally, this was found to be necessary, else
// something (the Go runtime?) would corrupt (garbage-collect?) the
// handle.
func createHandle(hndl cgo.Handle) unsafe.Pointer {
// uintptr_t* hptr = malloc(sizeof(uintptr_t));
hptr := (*C.uintptr_t)(C.calloc(C.sizeof_uintptr_t, C.size_t(1)))
// *hptr = (uintptr)hndl;
*hptr = C.uintptr_t(uintptr(hndl))
return unsafe.Pointer(hptr)
}
func getFromHandle[T any](ptr unsafe.Pointer) *T {
// uintptr_t* hptr = (uintptr_t*)ptr;
hptr := (*C.uintptr_t)(ptr)
return cgo.Handle((uintptr)(*hptr)).Value().(*T)
}
func exportStringOption(val string, out *C.char, length *C.size_t) C.AdbcStatusCode {
lenWithTerminator := C.size_t(len(val) + 1)
if lenWithTerminator <= *length {
sink := fromCArr[byte]((*byte)(unsafe.Pointer(out)), int(*length))
copy(sink, val)
sink[lenWithTerminator] = 0
}
*length = lenWithTerminator
return C.ADBC_STATUS_OK
}
func exportBytesOption(val []byte, out *C.uint8_t, length *C.size_t) C.AdbcStatusCode {
if C.size_t(len(val)) <= *length {
sink := fromCArr[byte]((*byte)(out), int(*length))
copy(sink, val)
}
*length = C.size_t(len(val))
return C.ADBC_STATUS_OK
}
type cancellableContext struct {
ctx context.Context
cancel context.CancelFunc
}
func (c *cancellableContext) newContext() context.Context {
c.cancelContext()
c.ctx, c.cancel = context.WithCancel(context.Background())
return c.ctx
}
func (c *cancellableContext) cancelContext() {
if c.cancel != nil {
c.cancel()
}
c.ctx = nil
c.cancel = nil
}
func checkDBAlloc(db *C.struct_AdbcDatabase, err *C.struct_AdbcError, fname string) bool {
if globalPoison.Load() {
setErr(err, "%s: Go panicked, driver is in unknown state", fname)
return false
}
if db == nil {
setErr(err, "%s: database not allocated", fname)
return false
}
if db.private_data == nil {
setErr(err, "%s: database not allocated", fname)
return false
}
return true
}
func checkDBInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError, fname string) *cDatabase {
if !checkDBAlloc(db, err, fname) {
return nil
}
cdb := getFromHandle[cDatabase](db.private_data)
if cdb.db == nil {
setErr(err, "%s: database not initialized", fname)
return nil
}
return cdb
}
// Custom ArrowArrayStream export to support ADBC error data in ArrowArrayStream
type cArrayStream struct {
rdr array.RecordReader
// Must be C-allocated
adbcErr *C.struct_AdbcError
status C.AdbcStatusCode
}
func (cStream *cArrayStream) maybeError() C.int {
err := cStream.rdr.Err()
if err != nil {
if cStream.adbcErr != nil {
C.FlightSQLerrRelease(cStream.adbcErr)
} else {
cStream.adbcErr = (*C.struct_AdbcError)(C.calloc(1, C.ADBC_ERROR_1_1_0_SIZE))
}
cStream.adbcErr.vendor_code = C.ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA
cStream.status = C.AdbcStatusCode(errToAdbcErr(cStream.adbcErr, err))
switch adbc.Status(cStream.status) {
case adbc.StatusUnknown:
return C.EIO
case adbc.StatusNotImplemented:
return C.ENOTSUP
case adbc.StatusNotFound:
return C.ENOENT
case adbc.StatusAlreadyExists:
return C.EEXIST
case adbc.StatusInvalidArgument:
return C.EINVAL
case adbc.StatusInvalidState:
return C.EINVAL
case adbc.StatusInvalidData:
return C.EIO
case adbc.StatusIntegrity:
return C.EIO
case adbc.StatusInternal:
return C.EIO
case adbc.StatusIO:
return C.EIO
case adbc.StatusCancelled:
return C.ECANCELED
case adbc.StatusTimeout:
return C.ETIMEDOUT
case adbc.StatusUnauthenticated:
return C.EACCES
case adbc.StatusUnauthorized:
return C.EACCES
default:
return C.EIO
}
}
return 0
}
//export FlightSQLArrayStreamGetLastError
func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t {
if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.adbcErr != nil {
return cStream.adbcErr.message
}
return nil
}
//export FlightSQLArrayStreamGetNext
func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int {
if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if cStream.rdr.Next() {
cdata.ExportArrowRecordBatch(cStream.rdr.Record(), toCdataArray(array), nil)
return 0
}
array.release = nil
array.private_data = nil
return cStream.maybeError()
}
//export FlightSQLArrayStreamGetSchema
func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int {
if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return C.EINVAL
}
cStream := getFromHandle[cArrayStream](stream.private_data)
s := cStream.rdr.Schema()
if s == nil {
return cStream.maybeError()
}
cdata.ExportArrowSchema(s, toCdataSchema(schema))
return 0
}
//export FlightSQLArrayStreamRelease
func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) {
if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return
}
h := (*(*cgo.Handle)(stream.private_data))
cStream := h.Value().(*cArrayStream)
cStream.rdr.Release()
if cStream.adbcErr != nil {
C.FlightSQLerrRelease(cStream.adbcErr)
C.free(unsafe.Pointer(cStream.adbcErr))
}
C.free(unsafe.Pointer(stream.private_data))
stream.private_data = nil
h.Delete()
runtime.GC()
}
//export FlightSQLErrorFromArrayStream
func FlightSQLErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) *C.struct_AdbcError {
if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil {
return nil
}
cStream := getFromHandle[cArrayStream](stream.private_data)
if status != nil {
*status = cStream.status
}
return cStream.adbcErr
}
func exportRecordReader(rdr array.RecordReader, stream *C.struct_ArrowArrayStream) {
cStream := &cArrayStream{rdr: rdr, status: C.ADBC_STATUS_OK}
stream.get_last_error = (*[0]byte)(C.FlightSQLArrayStreamGetLastError)
stream.get_next = (*[0]byte)(C.FlightSQLArrayStreamGetNextTrampoline)
stream.get_schema = (*[0]byte)(C.FlightSQLArrayStreamGetSchemaTrampoline)
stream.release = (*[0]byte)(C.FlightSQLArrayStreamRelease)
hndl := cgo.NewHandle(cStream)
stream.private_data = createHandle(hndl)
rdr.Retain()
}
type cDatabase struct {
opts map[string]string
db adbc.Database
}
//export FlightSQLDatabaseGetOption
func FlightSQLDatabaseGetOption(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseGetOption", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseGetOption")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseGetOption: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOption(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportStringOption(val, value, length)
}
//export FlightSQLDatabaseGetOptionBytes
func FlightSQLDatabaseGetOptionBytes(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseGetOptionBytes", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionBytes")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseGetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionBytes(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportBytesOption(val, value, length)
}
//export FlightSQLDatabaseGetOptionDouble
func FlightSQLDatabaseGetOptionDouble(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseGetOptionDouble", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionDouble")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseGetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionDouble(C.GoString(key))
*value = C.double(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLDatabaseGetOptionInt
func FlightSQLDatabaseGetOptionInt(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseGetOptionInt", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionInt")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseGetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionInt(C.GoString(key))
*value = C.int64_t(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLDatabaseInit
func FlightSQLDatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseInit", e)
}
}()
if !checkDBAlloc(db, err, "AdbcDatabaseInit") {
return C.ADBC_STATUS_INVALID_STATE
}
cdb := getFromHandle[cDatabase](db.private_data)
if cdb.db != nil {
setErr(err, "AdbcDatabaseInit: database already initialized")
return C.ADBC_STATUS_INVALID_STATE
}
adb, aerr := drv.NewDatabase(cdb.opts)
if aerr != nil {
return C.AdbcStatusCode(errToAdbcErr(err, aerr))
}
initLoggingFromEnv(adb)
cdb.db = adb
return C.ADBC_STATUS_OK
}
//export FlightSQLDatabaseNew
func FlightSQLDatabaseNew(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseNew", e)
}
}()
if globalPoison.Load() {
setErr(err, "AdbcDatabaseNew: Go panicked, driver is in unknown state")
return C.ADBC_STATUS_INTERNAL
}
if db.private_data != nil {
setErr(err, "AdbcDatabaseNew: database already allocated")
return C.ADBC_STATUS_INVALID_STATE
}
dbobj := &cDatabase{opts: make(map[string]string)}
hndl := cgo.NewHandle(dbobj)
db.private_data = createHandle(hndl)
return C.ADBC_STATUS_OK
}
//export FlightSQLDatabaseRelease
func FlightSQLDatabaseRelease(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseInit", e)
}
}()
if !checkDBAlloc(db, err, "AdbcDatabaseRelease") {
return C.ADBC_STATUS_INVALID_STATE
}
h := (*(*cgo.Handle)(db.private_data))
cdb := h.Value().(*cDatabase)
if cdb.db != nil {
cdb.db.Close()
cdb.db = nil
}
cdb.opts = nil
if db.private_data != nil {
C.free(unsafe.Pointer(db.private_data))
db.private_data = nil
}
h.Delete()
// manually trigger GC for two reasons:
// 1. ASAN expects the release callback to be called before
// the process ends, but GC is not deterministic. So by manually
// triggering the GC we ensure the release callback gets called.
// 2. Creates deterministic GC behavior by all Release functions
// triggering a garbage collection
runtime.GC()
return C.ADBC_STATUS_OK
}
//export FlightSQLDatabaseSetOption
func FlightSQLDatabaseSetOption(db *C.struct_AdbcDatabase, key, value *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseSetOption", e)
}
}()
if !checkDBAlloc(db, err, "AdbcDatabaseSetOption") {
return C.ADBC_STATUS_INVALID_STATE
}
cdb := getFromHandle[cDatabase](db.private_data)
k, v := C.GoString(key), C.GoString(value)
if cdb.db != nil {
opts, ok := cdb.db.(adbc.PostInitOptions)
if !ok {
setErr(err, "AdbcDatabaseSetOption: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOption(k, v)))
} else {
cdb.opts[k] = v
}
return C.ADBC_STATUS_OK
}
//export FlightSQLDatabaseSetOptionBytes
func FlightSQLDatabaseSetOptionBytes(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseSetOptionBytes", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionBytes")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseSetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length)))))
}
//export FlightSQLDatabaseSetOptionDouble
func FlightSQLDatabaseSetOptionDouble(db *C.struct_AdbcDatabase, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseSetOptionDouble", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionDouble")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseSetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value))))
}
//export FlightSQLDatabaseSetOptionInt
func FlightSQLDatabaseSetOptionInt(db *C.struct_AdbcDatabase, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcDatabaseSetOptionInt", e)
}
}()
cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionInt")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := cdb.db.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcDatabaseSetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value))))
}
type cConn struct {
cancellableContext
cnxn adbc.Connection
initArgs map[string]string
}
func checkConnAlloc(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError, fname string) bool {
if globalPoison.Load() {
setErr(err, "%s: Go panicked, driver is in unknown state", fname)
return false
}
if cnxn == nil {
setErr(err, "%s: connection not allocated", fname)
return false
}
if cnxn.private_data == nil {
setErr(err, "%s: connection not allocated", fname)
return false
}
return true
}
func checkConnInit(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError, fname string) *cConn {
if !checkConnAlloc(cnxn, err, fname) {
return nil
}
conn := getFromHandle[cConn](cnxn.private_data)
if conn.cnxn == nil {
setErr(err, "%s: connection not initialized", fname)
return nil
}
return conn
}
//export FlightSQLConnectionGetOption
func FlightSQLConnectionGetOption(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetOption", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionGetOption")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionGetOption: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOption(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportStringOption(val, value, length)
}
//export FlightSQLConnectionGetOptionBytes
func FlightSQLConnectionGetOptionBytes(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetOptionBytes", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionGetOptionBytes")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionGetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionBytes(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportBytesOption(val, value, length)
}
//export FlightSQLConnectionGetOptionDouble
func FlightSQLConnectionGetOptionDouble(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetOptionDouble", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionGetOptionDouble")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionGetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionDouble(C.GoString(key))
*value = C.double(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLConnectionGetOptionInt
func FlightSQLConnectionGetOptionInt(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetOptionInt", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionGetOptionInt")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionGetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionInt(C.GoString(key))
*value = C.int64_t(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLConnectionNew
func FlightSQLConnectionNew(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionNew", e)
}
}()
if globalPoison.Load() {
setErr(err, "AdbcConnectionNew: Go panicked, driver is in unknown state")
return C.ADBC_STATUS_INTERNAL
}
if cnxn.private_data != nil {
setErr(err, "AdbcConnectionNew: connection already allocated")
return C.ADBC_STATUS_INVALID_STATE
}
hndl := cgo.NewHandle(&cConn{})
cnxn.private_data = createHandle(hndl)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionSetOption
func FlightSQLConnectionSetOption(cnxn *C.struct_AdbcConnection, key, val *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionSetOption", e)
}
}()
if !checkConnAlloc(cnxn, err, "AdbcConnectionSetOption") {
return C.ADBC_STATUS_INVALID_STATE
}
conn := getFromHandle[cConn](cnxn.private_data)
if conn.cnxn == nil {
// not yet initialized
k, v := C.GoString(key), C.GoString(val)
if conn.initArgs == nil {
conn.initArgs = map[string]string{}
}
conn.initArgs[k] = v
return C.ADBC_STATUS_OK
}
opts, ok := conn.cnxn.(adbc.PostInitOptions)
if !ok {
setErr(err, "AdbcConnectionSetOption: not supported post-init")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOption(C.GoString(key), C.GoString(val))))
}
//export FlightSQLConnectionSetOptionBytes
func FlightSQLConnectionSetOptionBytes(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionSetOptionBytes", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionSetOptionBytes")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionSetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length)))))
}
//export FlightSQLConnectionSetOptionDouble
func FlightSQLConnectionSetOptionDouble(db *C.struct_AdbcConnection, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionSetOptionDouble", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionSetOptionDouble")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionSetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value))))
}
//export FlightSQLConnectionSetOptionInt
func FlightSQLConnectionSetOptionInt(db *C.struct_AdbcConnection, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionSetOptionInt", e)
}
}()
conn := checkConnInit(db, err, "AdbcConnectionSetOptionInt")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := conn.cnxn.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcConnectionSetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value))))
}
//export FlightSQLConnectionInit
func FlightSQLConnectionInit(cnxn *C.struct_AdbcConnection, db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionInit", e)
}
}()
if !checkConnAlloc(cnxn, err, "AdbcConnectionInit") {
return C.ADBC_STATUS_INVALID_STATE
}
conn := getFromHandle[cConn](cnxn.private_data)
if conn.cnxn != nil {
setErr(err, "AdbcConnectionInit: connection already initialized")
return C.ADBC_STATUS_INVALID_STATE
}
cdb := checkDBInit(db, err, "AdbcConnectionInit")
if cdb == nil {
return C.ADBC_STATUS_INVALID_STATE
}
c, e := cdb.db.Open(context.Background())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
conn.cnxn = c
if len(conn.initArgs) > 0 {
// C allow SetOption before Init, Go doesn't allow options to Open so set them now
opts, ok := conn.cnxn.(adbc.PostInitOptions)
if !ok {
setErr(err, "AdbcConnectionInit: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
for k, v := range conn.initArgs {
rawCode := errToAdbcErr(err, opts.SetOption(k, v))
if rawCode != adbc.StatusOK {
return C.AdbcStatusCode(rawCode)
}
}
conn.initArgs = nil
}
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionRelease
func FlightSQLConnectionRelease(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionRelease", e)
}
}()
if !checkConnAlloc(cnxn, err, "AdbcConnectionRelease") {
return C.ADBC_STATUS_INVALID_STATE
}
h := (*(*cgo.Handle)(cnxn.private_data))
conn := h.Value().(*cConn)
defer func() {
conn.cancelContext()
conn.cnxn = nil
C.free(cnxn.private_data)
cnxn.private_data = nil
h.Delete()
// manually trigger GC for two reasons:
// 1. ASAN expects the release callback to be called before
// the process ends, but GC is not deterministic. So by manually
// triggering the GC we ensure the release callback gets called.
// 2. Creates deterministic GC behavior by all Release functions
// triggering a garbage collection
runtime.GC()
}()
if conn.cnxn == nil {
return C.ADBC_STATUS_OK
}
return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Close()))
}
func fromCArr[T, CType any](ptr *CType, sz int) []T {
if ptr == nil || sz == 0 {
return nil
}
return unsafe.Slice((*T)(unsafe.Pointer(ptr)), sz)
}
func toCdataStream(ptr *C.struct_ArrowArrayStream) *cdata.CArrowArrayStream {
return (*cdata.CArrowArrayStream)(unsafe.Pointer(ptr))
}
func toCdataSchema(ptr *C.struct_ArrowSchema) *cdata.CArrowSchema {
return (*cdata.CArrowSchema)(unsafe.Pointer(ptr))
}
func toCdataArray(ptr *C.struct_ArrowArray) *cdata.CArrowArray {
return (*cdata.CArrowArray)(unsafe.Pointer(ptr))
}
//export FlightSQLConnectionCancel
func FlightSQLConnectionCancel(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionCancel", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionCancel")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
conn.cancelContext()
return C.ADBC_STATUS_OK
}
func toStrPtr(in *C.cchar_t) *string {
if in == nil {
return nil
}
out := C.GoString((*C.char)(in))
return &out
}
func toStrSlice(in **C.cchar_t) []string {
if in == nil {
return nil
}
sz := unsafe.Sizeof(*in)
out := make([]string, 0, 1)
for *in != nil {
out = append(out, C.GoString(*in))
in = (**C.cchar_t)(unsafe.Add(unsafe.Pointer(in), sz))
}
return out
}
//export FlightSQLConnectionGetInfo
func FlightSQLConnectionGetInfo(cnxn *C.struct_AdbcConnection, codes *C.cuint32_t, len C.size_t, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetInfo", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetInfo")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
infoCodes := fromCArr[adbc.InfoCode](codes, int(len))
rdr, e := conn.cnxn.GetInfo(conn.newContext(), infoCodes)
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionGetObjects
func FlightSQLConnectionGetObjects(cnxn *C.struct_AdbcConnection, depth C.int, catalog, dbSchema, tableName *C.cchar_t, tableType **C.cchar_t, columnName *C.cchar_t,
out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetObjects", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetObjects")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
rdr, e := conn.cnxn.GetObjects(conn.newContext(), adbc.ObjectDepth(depth), toStrPtr(catalog), toStrPtr(dbSchema), toStrPtr(tableName), toStrPtr(columnName), toStrSlice(tableType))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionGetStatistics
func FlightSQLConnectionGetStatistics(cnxn *C.struct_AdbcConnection, catalog, dbSchema, tableName *C.cchar_t, approximate C.char, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetStatistics", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetStatistics")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
gs, ok := conn.cnxn.(adbc.ConnectionGetStatistics)
if !ok {
setErr(err, "AdbcConnectionGetStatistics: not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
rdr, e := gs.GetStatistics(conn.newContext(), toStrPtr(catalog), toStrPtr(dbSchema), toStrPtr(tableName), int(approximate) != 0)
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionGetStatisticNames
func FlightSQLConnectionGetStatisticNames(cnxn *C.struct_AdbcConnection, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetStatistics", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetStatistics")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
gs, ok := conn.cnxn.(adbc.ConnectionGetStatistics)
if !ok {
setErr(err, "AdbcConnectionGetStatistics: not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
rdr, e := gs.GetStatisticNames(conn.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionGetTableSchema
func FlightSQLConnectionGetTableSchema(cnxn *C.struct_AdbcConnection, catalog, dbSchema, tableName *C.cchar_t, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetTableSchema", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetTableSchema")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
sc, e := conn.cnxn.GetTableSchema(conn.newContext(), toStrPtr(catalog), toStrPtr(dbSchema), C.GoString(tableName))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
cdata.ExportArrowSchema(sc, toCdataSchema(schema))
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionGetTableTypes
func FlightSQLConnectionGetTableTypes(cnxn *C.struct_AdbcConnection, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionGetTableTypes", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionGetTableTypes")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
rdr, e := conn.cnxn.GetTableTypes(conn.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionReadPartition
func FlightSQLConnectionReadPartition(cnxn *C.struct_AdbcConnection, serialized *C.cuint8_t, serializedLen C.size_t, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionReadPartition", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionReadPartition")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
rdr, e := conn.cnxn.ReadPartition(conn.newContext(), fromCArr[byte](serialized, int(serializedLen)))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rdr.Release()
exportRecordReader(rdr, out)
return C.ADBC_STATUS_OK
}
//export FlightSQLConnectionCommit
func FlightSQLConnectionCommit(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionCommit", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionCommit")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Commit(conn.newContext())))
}
//export FlightSQLConnectionRollback
func FlightSQLConnectionRollback(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcConnectionRollback", e)
}
}()
conn := checkConnInit(cnxn, err, "AdbcConnectionRollback")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Rollback(conn.newContext())))
}
type cStmt struct {
cancellableContext
stmt adbc.Statement
}
func checkStmtAlloc(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError, fname string) bool {
if globalPoison.Load() {
setErr(err, "%s: Go panicked, driver is in unknown state", fname)
return false
}
if stmt == nil {
setErr(err, "%s: statement not allocated", fname)
return false
}
if stmt.private_data == nil {
setErr(err, "%s: statement not allocated", fname)
return false
}
return true
}
func checkStmtInit(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError, fname string) *cStmt {
if !checkStmtAlloc(stmt, err, fname) {
return nil
}
cStmt := getFromHandle[cStmt](stmt.private_data)
if cStmt.stmt == nil {
setErr(err, "%s: statement not allocated", fname)
return nil
}
return cStmt
}
//export FlightSQLStatementGetOption
func FlightSQLStatementGetOption(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementGetOption", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementGetOption")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementGetOption: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOption(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportStringOption(val, value, length)
}
//export FlightSQLStatementGetOptionBytes
func FlightSQLStatementGetOptionBytes(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementGetOptionBytes", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementGetOptionBytes")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementGetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionBytes(C.GoString(key))
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return exportBytesOption(val, value, length)
}
//export FlightSQLStatementGetOptionDouble
func FlightSQLStatementGetOptionDouble(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementGetOptionDouble", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementGetOptionDouble")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementGetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionDouble(C.GoString(key))
*value = C.double(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLStatementGetOptionInt
func FlightSQLStatementGetOptionInt(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementGetOptionInt", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementGetOptionInt")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementGetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
val, e := opts.GetOptionInt(C.GoString(key))
*value = C.int64_t(val)
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
//export FlightSQLStatementNew
func FlightSQLStatementNew(cnxn *C.struct_AdbcConnection, stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementNew", e)
}
}()
if globalPoison.Load() {
setErr(err, "AdbcStatementNew: Go panicked, driver is in unknown state")
return C.ADBC_STATUS_INTERNAL
}
if stmt.private_data != nil {
setErr(err, "AdbcStatementNew: statement already allocated")
return C.ADBC_STATUS_INVALID_STATE
}
conn := checkConnInit(cnxn, err, "AdbcStatementNew")
if conn == nil {
return C.ADBC_STATUS_INVALID_STATE
}
st, e := conn.cnxn.NewStatement()
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
hndl := cgo.NewHandle(&cStmt{stmt: st})
stmt.private_data = createHandle(hndl)
return C.ADBC_STATUS_OK
}
//export FlightSQLStatementRelease
func FlightSQLStatementRelease(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementRelease", e)
}
}()
if globalPoison.Load() {
setErr(err, "AdbcStatementRelease: Go panicked, driver is in unknown state")
return C.ADBC_STATUS_INTERNAL
}
if !checkStmtAlloc(stmt, err, "AdbcStatementRelease") {
return C.ADBC_STATUS_INVALID_STATE
}
h := (*(*cgo.Handle)(stmt.private_data))
st := h.Value().(*cStmt)
defer func() {
st.cancelContext()
st.stmt = nil
C.free(stmt.private_data)
stmt.private_data = nil
h.Delete()
// manually trigger GC for two reasons:
// 1. ASAN expects the release callback to be called before
// the process ends, but GC is not deterministic. So by manually
// triggering the GC we ensure the release callback gets called.
// 2. Creates deterministic GC behavior by all Release functions
// triggering a garbage collection
runtime.GC()
}()
if st.stmt == nil {
return C.ADBC_STATUS_OK
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Close()))
}
//export FlightSQLStatementCancel
func FlightSQLStatementCancel(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementCancel", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementCancel")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
st.cancelContext()
return C.ADBC_STATUS_OK
}
//export FlightSQLStatementPrepare
func FlightSQLStatementPrepare(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementPrepare", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementPrepare")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Prepare(st.newContext())))
}
//export FlightSQLStatementExecuteQuery
func FlightSQLStatementExecuteQuery(stmt *C.struct_AdbcStatement, out *C.struct_ArrowArrayStream, affected *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementExecuteQuery", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementExecuteQuery")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
if out == nil {
n, e := st.stmt.ExecuteUpdate(st.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
if affected != nil {
*affected = C.int64_t(n)
}
} else {
rdr, n, e := st.stmt.ExecuteQuery(st.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
if affected != nil {
*affected = C.int64_t(n)
}
defer rdr.Release()
exportRecordReader(rdr, out)
}
return C.ADBC_STATUS_OK
}
//export FlightSQLStatementExecuteSchema
func FlightSQLStatementExecuteSchema(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementExecuteQuery", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementExecuteQuery")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
es, ok := st.stmt.(adbc.StatementExecuteSchema)
if !ok {
setErr(err, "AdbcStatementExecuteSchema: not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
sc, e := es.ExecuteSchema(st.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
cdata.ExportArrowSchema(sc, toCdataSchema(schema))
return C.ADBC_STATUS_OK
}
//export FlightSQLStatementSetSqlQuery
func FlightSQLStatementSetSqlQuery(stmt *C.struct_AdbcStatement, query *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetSqlQuery", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementSetSqlQuery")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetSqlQuery(C.GoString(query))))
}
//export FlightSQLStatementSetSubstraitPlan
func FlightSQLStatementSetSubstraitPlan(stmt *C.struct_AdbcStatement, plan *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetSubstraitPlan", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementSetSubstraitPlan")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetSubstraitPlan(fromCArr[byte](plan, int(length)))))
}
//export FlightSQLStatementBind
func FlightSQLStatementBind(stmt *C.struct_AdbcStatement, values *C.struct_ArrowArray, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementBind", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementBind")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
rec, e := cdata.ImportCRecordBatch(toCdataArray(values), toCdataSchema(schema))
if e != nil {
// if there was an error, we need to manually release the input
cdata.ReleaseCArrowArray(toCdataArray(values))
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
defer rec.Release()
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Bind(st.newContext(), rec)))
}
//export FlightSQLStatementBindStream
func FlightSQLStatementBindStream(stmt *C.struct_AdbcStatement, stream *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementBindStream", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementBindStream")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
rdr, e := cdata.ImportCRecordReader(toCdataStream(stream), nil)
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.BindStream(st.newContext(), rdr.(array.RecordReader))))
}
//export FlightSQLStatementGetParameterSchema
func FlightSQLStatementGetParameterSchema(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementGetParameterSchema", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementGetParameterSchema")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
sc, e := st.stmt.GetParameterSchema()
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
cdata.ExportArrowSchema(sc, toCdataSchema(schema))
return C.ADBC_STATUS_OK
}
//export FlightSQLStatementSetOption
func FlightSQLStatementSetOption(stmt *C.struct_AdbcStatement, key, value *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetOption", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementSetOption")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetOption(C.GoString(key), C.GoString(value))))
}
//export FlightSQLStatementSetOptionBytes
func FlightSQLStatementSetOptionBytes(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetOptionBytes", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementSetOptionBytes")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementSetOptionBytes: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length)))))
}
//export FlightSQLStatementSetOptionDouble
func FlightSQLStatementSetOptionDouble(db *C.struct_AdbcStatement, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetOptionDouble", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementSetOptionDouble")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementSetOptionDouble: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value))))
}
//export FlightSQLStatementSetOptionInt
func FlightSQLStatementSetOptionInt(db *C.struct_AdbcStatement, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementSetOptionInt", e)
}
}()
st := checkStmtInit(db, err, "AdbcStatementSetOptionInt")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
opts, ok := st.stmt.(adbc.GetSetOptions)
if !ok {
setErr(err, "AdbcStatementSetOptionInt: options are not supported")
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value))))
}
//export releasePartitions
func releasePartitions(partitions *C.struct_AdbcPartitions) {
if partitions.private_data == nil {
return
}
C.free(unsafe.Pointer(partitions.partitions))
C.free(unsafe.Pointer(partitions.partition_lengths))
C.free(partitions.private_data)
partitions.partitions = nil
partitions.partition_lengths = nil
partitions.private_data = nil
}
//export FlightSQLStatementExecutePartitions
func FlightSQLStatementExecutePartitions(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, partitions *C.struct_AdbcPartitions, affected *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) {
defer func() {
if e := recover(); e != nil {
code = poison(err, "AdbcStatementExecutePartitions", e)
}
}()
st := checkStmtInit(stmt, err, "AdbcStatementExecutePartitions")
if st == nil {
return C.ADBC_STATUS_INVALID_STATE
}
sc, part, n, e := st.stmt.ExecutePartitions(st.newContext())
if e != nil {
return C.AdbcStatusCode(errToAdbcErr(err, e))
}
if partitions == nil {
setErr(err, "AdbcStatementExecutePartitions: partitions output struct is null")
return C.ADBC_STATUS_INVALID_ARGUMENT
}
if affected != nil {
*affected = C.int64_t(n)
}
if sc != nil && schema != nil {
cdata.ExportArrowSchema(sc, toCdataSchema(schema))
}
partitions.num_partitions = C.size_t(part.NumPartitions)
partitions.partitions = (**C.cuint8_t)(C.malloc(C.size_t(unsafe.Sizeof((*C.uint8_t)(nil)) * uintptr(part.NumPartitions))))
partitions.partition_lengths = (*C.size_t)(C.malloc(C.size_t(unsafe.Sizeof(C.size_t(0)) * uintptr(part.NumPartitions))))
// Copy into C-allocated memory to avoid violating CGO rules
totalLen := 0
for _, p := range part.PartitionIDs {
totalLen += len(p)
}
partitions.private_data = C.calloc(C.size_t(totalLen), C.size_t(1))
dst := fromCArr[byte]((*byte)(partitions.private_data), totalLen)
partIDs := fromCArr[*C.cuint8_t](partitions.partitions, int(partitions.num_partitions))
partLens := fromCArr[C.size_t](partitions.partition_lengths, int(partitions.num_partitions))
for i, p := range part.PartitionIDs {
partIDs[i] = (*C.cuint8_t)(&dst[0])
copy(dst, p)
dst = dst[len(p):]
partLens[i] = C.size_t(len(p))
}
partitions.release = (*[0]byte)(C.releasePartitions)
return C.ADBC_STATUS_OK
}
//export FlightSQLDriverInit
func FlightSQLDriverInit(version C.int, rawDriver *C.void, err *C.struct_AdbcError) C.AdbcStatusCode {
driver := (*C.struct_AdbcDriver)(unsafe.Pointer(rawDriver))
switch version {
case C.ADBC_VERSION_1_0_0:
sink := fromCArr[byte]((*byte)(unsafe.Pointer(driver)), C.ADBC_DRIVER_1_0_0_SIZE)
memory.Set(sink, 0)
case C.ADBC_VERSION_1_1_0:
sink := fromCArr[byte]((*byte)(unsafe.Pointer(driver)), C.ADBC_DRIVER_1_1_0_SIZE)
memory.Set(sink, 0)
default:
setErr(err, "Only version 1.0.0/1.1.0 supported, got %d", int(version))
return C.ADBC_STATUS_NOT_IMPLEMENTED
}
driver.DatabaseInit = (*[0]byte)(C.FlightSQLDatabaseInit)
driver.DatabaseNew = (*[0]byte)(C.FlightSQLDatabaseNew)
driver.DatabaseRelease = (*[0]byte)(C.FlightSQLDatabaseRelease)
driver.DatabaseSetOption = (*[0]byte)(C.FlightSQLDatabaseSetOption)
driver.ConnectionNew = (*[0]byte)(C.FlightSQLConnectionNew)
driver.ConnectionInit = (*[0]byte)(C.FlightSQLConnectionInit)
driver.ConnectionRelease = (*[0]byte)(C.FlightSQLConnectionRelease)
driver.ConnectionSetOption = (*[0]byte)(C.FlightSQLConnectionSetOption)
driver.ConnectionGetInfo = (*[0]byte)(C.FlightSQLConnectionGetInfo)
driver.ConnectionGetObjects = (*[0]byte)(C.FlightSQLConnectionGetObjects)
driver.ConnectionGetTableSchema = (*[0]byte)(C.FlightSQLConnectionGetTableSchema)
driver.ConnectionGetTableTypes = (*[0]byte)(C.FlightSQLConnectionGetTableTypes)
driver.ConnectionReadPartition = (*[0]byte)(C.FlightSQLConnectionReadPartition)
driver.ConnectionCommit = (*[0]byte)(C.FlightSQLConnectionCommit)
driver.ConnectionRollback = (*[0]byte)(C.FlightSQLConnectionRollback)
driver.StatementNew = (*[0]byte)(C.FlightSQLStatementNew)
driver.StatementRelease = (*[0]byte)(C.FlightSQLStatementRelease)
driver.StatementSetOption = (*[0]byte)(C.FlightSQLStatementSetOption)
driver.StatementSetSqlQuery = (*[0]byte)(C.FlightSQLStatementSetSqlQuery)
driver.StatementSetSubstraitPlan = (*[0]byte)(C.FlightSQLStatementSetSubstraitPlan)
driver.StatementBind = (*[0]byte)(C.FlightSQLStatementBind)
driver.StatementBindStream = (*[0]byte)(C.FlightSQLStatementBindStream)
driver.StatementExecuteQuery = (*[0]byte)(C.FlightSQLStatementExecuteQuery)
driver.StatementExecutePartitions = (*[0]byte)(C.FlightSQLStatementExecutePartitions)
driver.StatementGetParameterSchema = (*[0]byte)(C.FlightSQLStatementGetParameterSchema)
driver.StatementPrepare = (*[0]byte)(C.FlightSQLStatementPrepare)
if version == C.ADBC_VERSION_1_1_0 {
driver.ErrorGetDetailCount = (*[0]byte)(C.FlightSQLErrorGetDetailCount)
driver.ErrorGetDetail = (*[0]byte)(C.FlightSQLErrorGetDetail)
driver.ErrorFromArrayStream = (*[0]byte)(C.FlightSQLErrorFromArrayStream)
driver.DatabaseGetOption = (*[0]byte)(C.FlightSQLDatabaseGetOption)
driver.DatabaseGetOptionBytes = (*[0]byte)(C.FlightSQLDatabaseGetOptionBytes)
driver.DatabaseGetOptionDouble = (*[0]byte)(C.FlightSQLDatabaseGetOptionDouble)
driver.DatabaseGetOptionInt = (*[0]byte)(C.FlightSQLDatabaseGetOptionInt)
driver.DatabaseSetOptionBytes = (*[0]byte)(C.FlightSQLDatabaseSetOptionBytes)
driver.DatabaseSetOptionDouble = (*[0]byte)(C.FlightSQLDatabaseSetOptionDouble)
driver.DatabaseSetOptionInt = (*[0]byte)(C.FlightSQLDatabaseSetOptionInt)
driver.ConnectionCancel = (*[0]byte)(C.FlightSQLConnectionCancel)
driver.ConnectionGetOption = (*[0]byte)(C.FlightSQLConnectionGetOption)
driver.ConnectionGetOptionBytes = (*[0]byte)(C.FlightSQLConnectionGetOptionBytes)
driver.ConnectionGetOptionDouble = (*[0]byte)(C.FlightSQLConnectionGetOptionDouble)
driver.ConnectionGetOptionInt = (*[0]byte)(C.FlightSQLConnectionGetOptionInt)
driver.ConnectionGetStatistics = (*[0]byte)(C.FlightSQLConnectionGetStatistics)
driver.ConnectionGetStatisticNames = (*[0]byte)(C.FlightSQLConnectionGetStatisticNames)
driver.ConnectionSetOptionBytes = (*[0]byte)(C.FlightSQLConnectionSetOptionBytes)
driver.ConnectionSetOptionDouble = (*[0]byte)(C.FlightSQLConnectionSetOptionDouble)
driver.ConnectionSetOptionInt = (*[0]byte)(C.FlightSQLConnectionSetOptionInt)
driver.StatementCancel = (*[0]byte)(C.FlightSQLStatementCancel)
driver.StatementExecuteSchema = (*[0]byte)(C.FlightSQLStatementExecuteSchema)
driver.StatementGetOption = (*[0]byte)(C.FlightSQLStatementGetOption)
driver.StatementGetOptionBytes = (*[0]byte)(C.FlightSQLStatementGetOptionBytes)
driver.StatementGetOptionDouble = (*[0]byte)(C.FlightSQLStatementGetOptionDouble)
driver.StatementGetOptionInt = (*[0]byte)(C.FlightSQLStatementGetOptionInt)
driver.StatementSetOptionBytes = (*[0]byte)(C.FlightSQLStatementSetOptionBytes)
driver.StatementSetOptionDouble = (*[0]byte)(C.FlightSQLStatementSetOptionDouble)
driver.StatementSetOptionInt = (*[0]byte)(C.FlightSQLStatementSetOptionInt)
}
return C.ADBC_STATUS_OK
}
func main() {}