go/adbc/pkg/flightsql/driver.go (1,529 lines of code) (raw):

// Code generated by _tmpl/driver.go.tmpl. DO NOT EDIT. // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build driverlib package main // ADBC_EXPORTING is required on Windows, or else the symbols // won't be accessible to the driver manager // #cgo CFLAGS: -DADBC_EXPORTING // #cgo CXXFLAGS: -std=c++11 -DADBC_EXPORTING // #include "../../drivermgr/arrow-adbc/adbc.h" // #include "utils.h" // #include <errno.h> // #include <stdint.h> // #include <string.h> // // typedef const char cchar_t; // typedef const uint8_t cuint8_t; // typedef const uint32_t cuint32_t; // typedef const struct AdbcError ConstAdbcError; // // int FlightSQLArrayStreamGetSchema(struct ArrowArrayStream*, struct ArrowSchema*); // int FlightSQLArrayStreamGetNext(struct ArrowArrayStream*, struct ArrowArray*); // const char* FlightSQLArrayStreamGetLastError(struct ArrowArrayStream*); // void FlightSQLArrayStreamRelease(struct ArrowArrayStream*); // // int FlightSQLArrayStreamGetSchemaTrampoline(struct ArrowArrayStream*, struct ArrowSchema*); // int FlightSQLArrayStreamGetNextTrampoline(struct ArrowArrayStream*, struct ArrowArray*); // // void releasePartitions(struct AdbcPartitions* partitions); // import "C" import ( "context" "errors" "fmt" "log/slog" "os" "runtime" "runtime/cgo" "strings" "sync/atomic" "unsafe" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow-adbc/go/adbc/driver/flightsql" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/cdata" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/arrow/memory/mallocator" ) // Must use malloc() to respect CGO rules var drv = flightsql.NewDriver(mallocator.NewMallocator()) // Flag set if any method panic()ed - afterwards all calls to driver will fail // since internal state of driver is unknown var globalPoison atomic.Bool const errPrefix = "[FlightSQL] " const logLevelEnvVar = "ADBC_DRIVER_FLIGHTSQL_LOG_LEVEL" func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) { if err == nil { return } if err.release != nil { C.FlightSQLerrRelease(err) } msg := errPrefix + fmt.Sprintf(format, vals...) err.message = C.CString(msg) err.release = (*[0]byte)(C.FlightSQL_release_error) } func setErrWithDetails(err *C.struct_AdbcError, adbcError adbc.Error) { if err == nil { return } if err.vendor_code != C.ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA { setErr(err, adbcError.Msg) return } cErrPtr := C.calloc(C.sizeof_struct_FlightSQLError, C.size_t(1)) cErr := (*C.struct_FlightSQLError)(cErrPtr) cErr.message = C.CString(adbcError.Msg) err.message = cErr.message err.release = (*[0]byte)(C.FlightSQLReleaseErrWithDetails) err.private_data = cErrPtr numDetails := len(adbcError.Details) if numDetails > 0 { cErr.keys = (**C.cchar_t)(C.calloc(C.size_t(numDetails), C.size_t(unsafe.Sizeof((*C.cchar_t)(nil))))) cErr.values = (**C.cuint8_t)(C.calloc(C.size_t(numDetails), C.size_t(unsafe.Sizeof((*C.cuint8_t)(nil))))) cErr.lengths = (*C.size_t)(C.calloc(C.size_t(numDetails), C.sizeof_size_t)) keys := fromCArr[*C.cchar_t](cErr.keys, numDetails) values := fromCArr[*C.cuint8_t](cErr.values, numDetails) lengths := fromCArr[C.size_t](cErr.lengths, numDetails) for i, detail := range adbcError.Details { keys[i] = C.CString(detail.Key()) bytes, err := detail.Serialize() if err != nil { msg := err.Error() values[i] = (*C.cuint8_t)(unsafe.Pointer(C.CString(msg))) lengths[i] = C.size_t(len(msg)) } else { values[i] = (*C.cuint8_t)(C.malloc(C.size_t(len(bytes)))) sink := fromCArr[byte]((*byte)(values[i]), len(bytes)) copy(sink, bytes) lengths[i] = C.size_t(len(bytes)) } } } else { cErr.keys = nil cErr.values = nil cErr.lengths = nil } cErr.count = C.int(numDetails) } func errToAdbcErr(adbcerr *C.struct_AdbcError, err error) adbc.Status { if err == nil { return adbc.StatusOK } var adbcError adbc.Error if errors.As(err, &adbcError) { setErrWithDetails(adbcerr, adbcError) return adbcError.Code } setErr(adbcerr, err.Error()) return adbc.StatusUnknown } // We panicked; make all API functions error and dump stack traces func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCode { if !globalPoison.Swap(true) { // Only print stack traces on the first occurrence buf := make([]byte, 1<<20) length := runtime.Stack(buf, true) fmt.Fprintf(os.Stderr, "FlightSQL driver panicked, stack traces:\n%s", buf[:length]) } setErr(err, "%s: Go panic in FlightSQL driver (see stderr): %#v", fname, e) return C.ADBC_STATUS_INTERNAL } // Check environment variables and enable logging if possible. func initLoggingFromEnv(db adbc.Database) { logLevel := slog.LevelError switch strings.ToLower(os.Getenv(logLevelEnvVar)) { case "debug": logLevel = slog.LevelDebug case "info": logLevel = slog.LevelInfo case "warn": case "warning": logLevel = slog.LevelWarn case "error": logLevel = slog.LevelError case "": return default: printLoggingHelp() return } h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ AddSource: false, Level: logLevel, }) logger := slog.New(h) ext, ok := db.(adbc.DatabaseLogging) if !ok { logger.Error("FlightSQL does not support logging") return } ext.SetLogger(logger) } func printLoggingHelp() { fmt.Fprintf(os.Stderr, "FlightSQL: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar) } // Allocate a new cgo.Handle and store its address in a heap-allocated // uintptr_t. Experimentally, this was found to be necessary, else // something (the Go runtime?) would corrupt (garbage-collect?) the // handle. func createHandle(hndl cgo.Handle) unsafe.Pointer { // uintptr_t* hptr = malloc(sizeof(uintptr_t)); hptr := (*C.uintptr_t)(C.calloc(C.sizeof_uintptr_t, C.size_t(1))) // *hptr = (uintptr)hndl; *hptr = C.uintptr_t(uintptr(hndl)) return unsafe.Pointer(hptr) } func getFromHandle[T any](ptr unsafe.Pointer) *T { // uintptr_t* hptr = (uintptr_t*)ptr; hptr := (*C.uintptr_t)(ptr) return cgo.Handle((uintptr)(*hptr)).Value().(*T) } func exportStringOption(val string, out *C.char, length *C.size_t) C.AdbcStatusCode { lenWithTerminator := C.size_t(len(val) + 1) if lenWithTerminator <= *length { sink := fromCArr[byte]((*byte)(unsafe.Pointer(out)), int(*length)) copy(sink, val) sink[lenWithTerminator] = 0 } *length = lenWithTerminator return C.ADBC_STATUS_OK } func exportBytesOption(val []byte, out *C.uint8_t, length *C.size_t) C.AdbcStatusCode { if C.size_t(len(val)) <= *length { sink := fromCArr[byte]((*byte)(out), int(*length)) copy(sink, val) } *length = C.size_t(len(val)) return C.ADBC_STATUS_OK } type cancellableContext struct { ctx context.Context cancel context.CancelFunc } func (c *cancellableContext) newContext() context.Context { c.cancelContext() c.ctx, c.cancel = context.WithCancel(context.Background()) return c.ctx } func (c *cancellableContext) cancelContext() { if c.cancel != nil { c.cancel() } c.ctx = nil c.cancel = nil } func checkDBAlloc(db *C.struct_AdbcDatabase, err *C.struct_AdbcError, fname string) bool { if globalPoison.Load() { setErr(err, "%s: Go panicked, driver is in unknown state", fname) return false } if db == nil { setErr(err, "%s: database not allocated", fname) return false } if db.private_data == nil { setErr(err, "%s: database not allocated", fname) return false } return true } func checkDBInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError, fname string) *cDatabase { if !checkDBAlloc(db, err, fname) { return nil } cdb := getFromHandle[cDatabase](db.private_data) if cdb.db == nil { setErr(err, "%s: database not initialized", fname) return nil } return cdb } // Custom ArrowArrayStream export to support ADBC error data in ArrowArrayStream type cArrayStream struct { rdr array.RecordReader // Must be C-allocated adbcErr *C.struct_AdbcError status C.AdbcStatusCode } func (cStream *cArrayStream) maybeError() C.int { err := cStream.rdr.Err() if err != nil { if cStream.adbcErr != nil { C.FlightSQLerrRelease(cStream.adbcErr) } else { cStream.adbcErr = (*C.struct_AdbcError)(C.calloc(1, C.ADBC_ERROR_1_1_0_SIZE)) } cStream.adbcErr.vendor_code = C.ADBC_ERROR_VENDOR_CODE_PRIVATE_DATA cStream.status = C.AdbcStatusCode(errToAdbcErr(cStream.adbcErr, err)) switch adbc.Status(cStream.status) { case adbc.StatusUnknown: return C.EIO case adbc.StatusNotImplemented: return C.ENOTSUP case adbc.StatusNotFound: return C.ENOENT case adbc.StatusAlreadyExists: return C.EEXIST case adbc.StatusInvalidArgument: return C.EINVAL case adbc.StatusInvalidState: return C.EINVAL case adbc.StatusInvalidData: return C.EIO case adbc.StatusIntegrity: return C.EIO case adbc.StatusInternal: return C.EIO case adbc.StatusIO: return C.EIO case adbc.StatusCancelled: return C.ECANCELED case adbc.StatusTimeout: return C.ETIMEDOUT case adbc.StatusUnauthenticated: return C.EACCES case adbc.StatusUnauthorized: return C.EACCES default: return C.EIO } } return 0 } //export FlightSQLArrayStreamGetLastError func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t { if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) if cStream.adbcErr != nil { return cStream.adbcErr.message } return nil } //export FlightSQLArrayStreamGetNext func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int { if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) if cStream.rdr.Next() { cdata.ExportArrowRecordBatch(cStream.rdr.Record(), toCdataArray(array), nil) return 0 } array.release = nil array.private_data = nil return cStream.maybeError() } //export FlightSQLArrayStreamGetSchema func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int { if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) s := cStream.rdr.Schema() if s == nil { return cStream.maybeError() } cdata.ExportArrowSchema(s, toCdataSchema(schema)) return 0 } //export FlightSQLArrayStreamRelease func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) { if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return } h := (*(*cgo.Handle)(stream.private_data)) cStream := h.Value().(*cArrayStream) cStream.rdr.Release() if cStream.adbcErr != nil { C.FlightSQLerrRelease(cStream.adbcErr) C.free(unsafe.Pointer(cStream.adbcErr)) } C.free(unsafe.Pointer(stream.private_data)) stream.private_data = nil h.Delete() runtime.GC() } //export FlightSQLErrorFromArrayStream func FlightSQLErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) *C.struct_AdbcError { if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) if status != nil { *status = cStream.status } return cStream.adbcErr } func exportRecordReader(rdr array.RecordReader, stream *C.struct_ArrowArrayStream) { cStream := &cArrayStream{rdr: rdr, status: C.ADBC_STATUS_OK} stream.get_last_error = (*[0]byte)(C.FlightSQLArrayStreamGetLastError) stream.get_next = (*[0]byte)(C.FlightSQLArrayStreamGetNextTrampoline) stream.get_schema = (*[0]byte)(C.FlightSQLArrayStreamGetSchemaTrampoline) stream.release = (*[0]byte)(C.FlightSQLArrayStreamRelease) hndl := cgo.NewHandle(cStream) stream.private_data = createHandle(hndl) rdr.Retain() } type cDatabase struct { opts map[string]string db adbc.Database } //export FlightSQLDatabaseGetOption func FlightSQLDatabaseGetOption(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseGetOption", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseGetOption") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseGetOption: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOption(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportStringOption(val, value, length) } //export FlightSQLDatabaseGetOptionBytes func FlightSQLDatabaseGetOptionBytes(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseGetOptionBytes", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionBytes") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseGetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionBytes(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportBytesOption(val, value, length) } //export FlightSQLDatabaseGetOptionDouble func FlightSQLDatabaseGetOptionDouble(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseGetOptionDouble", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionDouble") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseGetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionDouble(C.GoString(key)) *value = C.double(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLDatabaseGetOptionInt func FlightSQLDatabaseGetOptionInt(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseGetOptionInt", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseGetOptionInt") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseGetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionInt(C.GoString(key)) *value = C.int64_t(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLDatabaseInit func FlightSQLDatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseInit", e) } }() if !checkDBAlloc(db, err, "AdbcDatabaseInit") { return C.ADBC_STATUS_INVALID_STATE } cdb := getFromHandle[cDatabase](db.private_data) if cdb.db != nil { setErr(err, "AdbcDatabaseInit: database already initialized") return C.ADBC_STATUS_INVALID_STATE } adb, aerr := drv.NewDatabase(cdb.opts) if aerr != nil { return C.AdbcStatusCode(errToAdbcErr(err, aerr)) } initLoggingFromEnv(adb) cdb.db = adb return C.ADBC_STATUS_OK } //export FlightSQLDatabaseNew func FlightSQLDatabaseNew(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseNew", e) } }() if globalPoison.Load() { setErr(err, "AdbcDatabaseNew: Go panicked, driver is in unknown state") return C.ADBC_STATUS_INTERNAL } if db.private_data != nil { setErr(err, "AdbcDatabaseNew: database already allocated") return C.ADBC_STATUS_INVALID_STATE } dbobj := &cDatabase{opts: make(map[string]string)} hndl := cgo.NewHandle(dbobj) db.private_data = createHandle(hndl) return C.ADBC_STATUS_OK } //export FlightSQLDatabaseRelease func FlightSQLDatabaseRelease(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseInit", e) } }() if !checkDBAlloc(db, err, "AdbcDatabaseRelease") { return C.ADBC_STATUS_INVALID_STATE } h := (*(*cgo.Handle)(db.private_data)) cdb := h.Value().(*cDatabase) if cdb.db != nil { cdb.db.Close() cdb.db = nil } cdb.opts = nil if db.private_data != nil { C.free(unsafe.Pointer(db.private_data)) db.private_data = nil } h.Delete() // manually trigger GC for two reasons: // 1. ASAN expects the release callback to be called before // the process ends, but GC is not deterministic. So by manually // triggering the GC we ensure the release callback gets called. // 2. Creates deterministic GC behavior by all Release functions // triggering a garbage collection runtime.GC() return C.ADBC_STATUS_OK } //export FlightSQLDatabaseSetOption func FlightSQLDatabaseSetOption(db *C.struct_AdbcDatabase, key, value *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseSetOption", e) } }() if !checkDBAlloc(db, err, "AdbcDatabaseSetOption") { return C.ADBC_STATUS_INVALID_STATE } cdb := getFromHandle[cDatabase](db.private_data) k, v := C.GoString(key), C.GoString(value) if cdb.db != nil { opts, ok := cdb.db.(adbc.PostInitOptions) if !ok { setErr(err, "AdbcDatabaseSetOption: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOption(k, v))) } else { cdb.opts[k] = v } return C.ADBC_STATUS_OK } //export FlightSQLDatabaseSetOptionBytes func FlightSQLDatabaseSetOptionBytes(db *C.struct_AdbcDatabase, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseSetOptionBytes", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionBytes") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseSetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length))))) } //export FlightSQLDatabaseSetOptionDouble func FlightSQLDatabaseSetOptionDouble(db *C.struct_AdbcDatabase, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseSetOptionDouble", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionDouble") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseSetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value)))) } //export FlightSQLDatabaseSetOptionInt func FlightSQLDatabaseSetOptionInt(db *C.struct_AdbcDatabase, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcDatabaseSetOptionInt", e) } }() cdb := checkDBInit(db, err, "AdbcDatabaseSetOptionInt") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := cdb.db.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcDatabaseSetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value)))) } type cConn struct { cancellableContext cnxn adbc.Connection initArgs map[string]string } func checkConnAlloc(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError, fname string) bool { if globalPoison.Load() { setErr(err, "%s: Go panicked, driver is in unknown state", fname) return false } if cnxn == nil { setErr(err, "%s: connection not allocated", fname) return false } if cnxn.private_data == nil { setErr(err, "%s: connection not allocated", fname) return false } return true } func checkConnInit(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError, fname string) *cConn { if !checkConnAlloc(cnxn, err, fname) { return nil } conn := getFromHandle[cConn](cnxn.private_data) if conn.cnxn == nil { setErr(err, "%s: connection not initialized", fname) return nil } return conn } //export FlightSQLConnectionGetOption func FlightSQLConnectionGetOption(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetOption", e) } }() conn := checkConnInit(db, err, "AdbcConnectionGetOption") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionGetOption: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOption(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportStringOption(val, value, length) } //export FlightSQLConnectionGetOptionBytes func FlightSQLConnectionGetOptionBytes(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetOptionBytes", e) } }() conn := checkConnInit(db, err, "AdbcConnectionGetOptionBytes") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionGetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionBytes(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportBytesOption(val, value, length) } //export FlightSQLConnectionGetOptionDouble func FlightSQLConnectionGetOptionDouble(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetOptionDouble", e) } }() conn := checkConnInit(db, err, "AdbcConnectionGetOptionDouble") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionGetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionDouble(C.GoString(key)) *value = C.double(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLConnectionGetOptionInt func FlightSQLConnectionGetOptionInt(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetOptionInt", e) } }() conn := checkConnInit(db, err, "AdbcConnectionGetOptionInt") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionGetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionInt(C.GoString(key)) *value = C.int64_t(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLConnectionNew func FlightSQLConnectionNew(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionNew", e) } }() if globalPoison.Load() { setErr(err, "AdbcConnectionNew: Go panicked, driver is in unknown state") return C.ADBC_STATUS_INTERNAL } if cnxn.private_data != nil { setErr(err, "AdbcConnectionNew: connection already allocated") return C.ADBC_STATUS_INVALID_STATE } hndl := cgo.NewHandle(&cConn{}) cnxn.private_data = createHandle(hndl) return C.ADBC_STATUS_OK } //export FlightSQLConnectionSetOption func FlightSQLConnectionSetOption(cnxn *C.struct_AdbcConnection, key, val *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionSetOption", e) } }() if !checkConnAlloc(cnxn, err, "AdbcConnectionSetOption") { return C.ADBC_STATUS_INVALID_STATE } conn := getFromHandle[cConn](cnxn.private_data) if conn.cnxn == nil { // not yet initialized k, v := C.GoString(key), C.GoString(val) if conn.initArgs == nil { conn.initArgs = map[string]string{} } conn.initArgs[k] = v return C.ADBC_STATUS_OK } opts, ok := conn.cnxn.(adbc.PostInitOptions) if !ok { setErr(err, "AdbcConnectionSetOption: not supported post-init") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOption(C.GoString(key), C.GoString(val)))) } //export FlightSQLConnectionSetOptionBytes func FlightSQLConnectionSetOptionBytes(db *C.struct_AdbcConnection, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionSetOptionBytes", e) } }() conn := checkConnInit(db, err, "AdbcConnectionSetOptionBytes") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionSetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length))))) } //export FlightSQLConnectionSetOptionDouble func FlightSQLConnectionSetOptionDouble(db *C.struct_AdbcConnection, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionSetOptionDouble", e) } }() conn := checkConnInit(db, err, "AdbcConnectionSetOptionDouble") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionSetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value)))) } //export FlightSQLConnectionSetOptionInt func FlightSQLConnectionSetOptionInt(db *C.struct_AdbcConnection, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionSetOptionInt", e) } }() conn := checkConnInit(db, err, "AdbcConnectionSetOptionInt") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := conn.cnxn.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcConnectionSetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value)))) } //export FlightSQLConnectionInit func FlightSQLConnectionInit(cnxn *C.struct_AdbcConnection, db *C.struct_AdbcDatabase, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionInit", e) } }() if !checkConnAlloc(cnxn, err, "AdbcConnectionInit") { return C.ADBC_STATUS_INVALID_STATE } conn := getFromHandle[cConn](cnxn.private_data) if conn.cnxn != nil { setErr(err, "AdbcConnectionInit: connection already initialized") return C.ADBC_STATUS_INVALID_STATE } cdb := checkDBInit(db, err, "AdbcConnectionInit") if cdb == nil { return C.ADBC_STATUS_INVALID_STATE } c, e := cdb.db.Open(context.Background()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } conn.cnxn = c if len(conn.initArgs) > 0 { // C allow SetOption before Init, Go doesn't allow options to Open so set them now opts, ok := conn.cnxn.(adbc.PostInitOptions) if !ok { setErr(err, "AdbcConnectionInit: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } for k, v := range conn.initArgs { rawCode := errToAdbcErr(err, opts.SetOption(k, v)) if rawCode != adbc.StatusOK { return C.AdbcStatusCode(rawCode) } } conn.initArgs = nil } return C.ADBC_STATUS_OK } //export FlightSQLConnectionRelease func FlightSQLConnectionRelease(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionRelease", e) } }() if !checkConnAlloc(cnxn, err, "AdbcConnectionRelease") { return C.ADBC_STATUS_INVALID_STATE } h := (*(*cgo.Handle)(cnxn.private_data)) conn := h.Value().(*cConn) defer func() { conn.cancelContext() conn.cnxn = nil C.free(cnxn.private_data) cnxn.private_data = nil h.Delete() // manually trigger GC for two reasons: // 1. ASAN expects the release callback to be called before // the process ends, but GC is not deterministic. So by manually // triggering the GC we ensure the release callback gets called. // 2. Creates deterministic GC behavior by all Release functions // triggering a garbage collection runtime.GC() }() if conn.cnxn == nil { return C.ADBC_STATUS_OK } return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Close())) } func fromCArr[T, CType any](ptr *CType, sz int) []T { if ptr == nil || sz == 0 { return nil } return unsafe.Slice((*T)(unsafe.Pointer(ptr)), sz) } func toCdataStream(ptr *C.struct_ArrowArrayStream) *cdata.CArrowArrayStream { return (*cdata.CArrowArrayStream)(unsafe.Pointer(ptr)) } func toCdataSchema(ptr *C.struct_ArrowSchema) *cdata.CArrowSchema { return (*cdata.CArrowSchema)(unsafe.Pointer(ptr)) } func toCdataArray(ptr *C.struct_ArrowArray) *cdata.CArrowArray { return (*cdata.CArrowArray)(unsafe.Pointer(ptr)) } //export FlightSQLConnectionCancel func FlightSQLConnectionCancel(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionCancel", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionCancel") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } conn.cancelContext() return C.ADBC_STATUS_OK } func toStrPtr(in *C.cchar_t) *string { if in == nil { return nil } out := C.GoString((*C.char)(in)) return &out } func toStrSlice(in **C.cchar_t) []string { if in == nil { return nil } sz := unsafe.Sizeof(*in) out := make([]string, 0, 1) for *in != nil { out = append(out, C.GoString(*in)) in = (**C.cchar_t)(unsafe.Add(unsafe.Pointer(in), sz)) } return out } //export FlightSQLConnectionGetInfo func FlightSQLConnectionGetInfo(cnxn *C.struct_AdbcConnection, codes *C.cuint32_t, len C.size_t, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetInfo", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetInfo") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } infoCodes := fromCArr[adbc.InfoCode](codes, int(len)) rdr, e := conn.cnxn.GetInfo(conn.newContext(), infoCodes) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionGetObjects func FlightSQLConnectionGetObjects(cnxn *C.struct_AdbcConnection, depth C.int, catalog, dbSchema, tableName *C.cchar_t, tableType **C.cchar_t, columnName *C.cchar_t, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetObjects", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetObjects") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } rdr, e := conn.cnxn.GetObjects(conn.newContext(), adbc.ObjectDepth(depth), toStrPtr(catalog), toStrPtr(dbSchema), toStrPtr(tableName), toStrPtr(columnName), toStrSlice(tableType)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionGetStatistics func FlightSQLConnectionGetStatistics(cnxn *C.struct_AdbcConnection, catalog, dbSchema, tableName *C.cchar_t, approximate C.char, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetStatistics", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetStatistics") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } gs, ok := conn.cnxn.(adbc.ConnectionGetStatistics) if !ok { setErr(err, "AdbcConnectionGetStatistics: not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } rdr, e := gs.GetStatistics(conn.newContext(), toStrPtr(catalog), toStrPtr(dbSchema), toStrPtr(tableName), int(approximate) != 0) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionGetStatisticNames func FlightSQLConnectionGetStatisticNames(cnxn *C.struct_AdbcConnection, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetStatistics", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetStatistics") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } gs, ok := conn.cnxn.(adbc.ConnectionGetStatistics) if !ok { setErr(err, "AdbcConnectionGetStatistics: not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } rdr, e := gs.GetStatisticNames(conn.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionGetTableSchema func FlightSQLConnectionGetTableSchema(cnxn *C.struct_AdbcConnection, catalog, dbSchema, tableName *C.cchar_t, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetTableSchema", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetTableSchema") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } sc, e := conn.cnxn.GetTableSchema(conn.newContext(), toStrPtr(catalog), toStrPtr(dbSchema), C.GoString(tableName)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } cdata.ExportArrowSchema(sc, toCdataSchema(schema)) return C.ADBC_STATUS_OK } //export FlightSQLConnectionGetTableTypes func FlightSQLConnectionGetTableTypes(cnxn *C.struct_AdbcConnection, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionGetTableTypes", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionGetTableTypes") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } rdr, e := conn.cnxn.GetTableTypes(conn.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionReadPartition func FlightSQLConnectionReadPartition(cnxn *C.struct_AdbcConnection, serialized *C.cuint8_t, serializedLen C.size_t, out *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionReadPartition", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionReadPartition") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } rdr, e := conn.cnxn.ReadPartition(conn.newContext(), fromCArr[byte](serialized, int(serializedLen))) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rdr.Release() exportRecordReader(rdr, out) return C.ADBC_STATUS_OK } //export FlightSQLConnectionCommit func FlightSQLConnectionCommit(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionCommit", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionCommit") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Commit(conn.newContext()))) } //export FlightSQLConnectionRollback func FlightSQLConnectionRollback(cnxn *C.struct_AdbcConnection, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcConnectionRollback", e) } }() conn := checkConnInit(cnxn, err, "AdbcConnectionRollback") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, conn.cnxn.Rollback(conn.newContext()))) } type cStmt struct { cancellableContext stmt adbc.Statement } func checkStmtAlloc(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError, fname string) bool { if globalPoison.Load() { setErr(err, "%s: Go panicked, driver is in unknown state", fname) return false } if stmt == nil { setErr(err, "%s: statement not allocated", fname) return false } if stmt.private_data == nil { setErr(err, "%s: statement not allocated", fname) return false } return true } func checkStmtInit(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError, fname string) *cStmt { if !checkStmtAlloc(stmt, err, fname) { return nil } cStmt := getFromHandle[cStmt](stmt.private_data) if cStmt.stmt == nil { setErr(err, "%s: statement not allocated", fname) return nil } return cStmt } //export FlightSQLStatementGetOption func FlightSQLStatementGetOption(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.char, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementGetOption", e) } }() st := checkStmtInit(db, err, "AdbcStatementGetOption") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementGetOption: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOption(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportStringOption(val, value, length) } //export FlightSQLStatementGetOptionBytes func FlightSQLStatementGetOptionBytes(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.uint8_t, length *C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementGetOptionBytes", e) } }() st := checkStmtInit(db, err, "AdbcStatementGetOptionBytes") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementGetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionBytes(C.GoString(key)) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return exportBytesOption(val, value, length) } //export FlightSQLStatementGetOptionDouble func FlightSQLStatementGetOptionDouble(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementGetOptionDouble", e) } }() st := checkStmtInit(db, err, "AdbcStatementGetOptionDouble") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementGetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionDouble(C.GoString(key)) *value = C.double(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLStatementGetOptionInt func FlightSQLStatementGetOptionInt(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementGetOptionInt", e) } }() st := checkStmtInit(db, err, "AdbcStatementGetOptionInt") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementGetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } val, e := opts.GetOptionInt(C.GoString(key)) *value = C.int64_t(val) return C.AdbcStatusCode(errToAdbcErr(err, e)) } //export FlightSQLStatementNew func FlightSQLStatementNew(cnxn *C.struct_AdbcConnection, stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementNew", e) } }() if globalPoison.Load() { setErr(err, "AdbcStatementNew: Go panicked, driver is in unknown state") return C.ADBC_STATUS_INTERNAL } if stmt.private_data != nil { setErr(err, "AdbcStatementNew: statement already allocated") return C.ADBC_STATUS_INVALID_STATE } conn := checkConnInit(cnxn, err, "AdbcStatementNew") if conn == nil { return C.ADBC_STATUS_INVALID_STATE } st, e := conn.cnxn.NewStatement() if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } hndl := cgo.NewHandle(&cStmt{stmt: st}) stmt.private_data = createHandle(hndl) return C.ADBC_STATUS_OK } //export FlightSQLStatementRelease func FlightSQLStatementRelease(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementRelease", e) } }() if globalPoison.Load() { setErr(err, "AdbcStatementRelease: Go panicked, driver is in unknown state") return C.ADBC_STATUS_INTERNAL } if !checkStmtAlloc(stmt, err, "AdbcStatementRelease") { return C.ADBC_STATUS_INVALID_STATE } h := (*(*cgo.Handle)(stmt.private_data)) st := h.Value().(*cStmt) defer func() { st.cancelContext() st.stmt = nil C.free(stmt.private_data) stmt.private_data = nil h.Delete() // manually trigger GC for two reasons: // 1. ASAN expects the release callback to be called before // the process ends, but GC is not deterministic. So by manually // triggering the GC we ensure the release callback gets called. // 2. Creates deterministic GC behavior by all Release functions // triggering a garbage collection runtime.GC() }() if st.stmt == nil { return C.ADBC_STATUS_OK } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Close())) } //export FlightSQLStatementCancel func FlightSQLStatementCancel(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementCancel", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementCancel") if st == nil { return C.ADBC_STATUS_INVALID_STATE } st.cancelContext() return C.ADBC_STATUS_OK } //export FlightSQLStatementPrepare func FlightSQLStatementPrepare(stmt *C.struct_AdbcStatement, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementPrepare", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementPrepare") if st == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Prepare(st.newContext()))) } //export FlightSQLStatementExecuteQuery func FlightSQLStatementExecuteQuery(stmt *C.struct_AdbcStatement, out *C.struct_ArrowArrayStream, affected *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementExecuteQuery", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementExecuteQuery") if st == nil { return C.ADBC_STATUS_INVALID_STATE } if out == nil { n, e := st.stmt.ExecuteUpdate(st.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } if affected != nil { *affected = C.int64_t(n) } } else { rdr, n, e := st.stmt.ExecuteQuery(st.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } if affected != nil { *affected = C.int64_t(n) } defer rdr.Release() exportRecordReader(rdr, out) } return C.ADBC_STATUS_OK } //export FlightSQLStatementExecuteSchema func FlightSQLStatementExecuteSchema(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementExecuteQuery", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementExecuteQuery") if st == nil { return C.ADBC_STATUS_INVALID_STATE } es, ok := st.stmt.(adbc.StatementExecuteSchema) if !ok { setErr(err, "AdbcStatementExecuteSchema: not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } sc, e := es.ExecuteSchema(st.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } cdata.ExportArrowSchema(sc, toCdataSchema(schema)) return C.ADBC_STATUS_OK } //export FlightSQLStatementSetSqlQuery func FlightSQLStatementSetSqlQuery(stmt *C.struct_AdbcStatement, query *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetSqlQuery", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementSetSqlQuery") if st == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetSqlQuery(C.GoString(query)))) } //export FlightSQLStatementSetSubstraitPlan func FlightSQLStatementSetSubstraitPlan(stmt *C.struct_AdbcStatement, plan *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetSubstraitPlan", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementSetSubstraitPlan") if st == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetSubstraitPlan(fromCArr[byte](plan, int(length))))) } //export FlightSQLStatementBind func FlightSQLStatementBind(stmt *C.struct_AdbcStatement, values *C.struct_ArrowArray, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementBind", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementBind") if st == nil { return C.ADBC_STATUS_INVALID_STATE } rec, e := cdata.ImportCRecordBatch(toCdataArray(values), toCdataSchema(schema)) if e != nil { // if there was an error, we need to manually release the input cdata.ReleaseCArrowArray(toCdataArray(values)) return C.AdbcStatusCode(errToAdbcErr(err, e)) } defer rec.Release() return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.Bind(st.newContext(), rec))) } //export FlightSQLStatementBindStream func FlightSQLStatementBindStream(stmt *C.struct_AdbcStatement, stream *C.struct_ArrowArrayStream, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementBindStream", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementBindStream") if st == nil { return C.ADBC_STATUS_INVALID_STATE } rdr, e := cdata.ImportCRecordReader(toCdataStream(stream), nil) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.BindStream(st.newContext(), rdr.(array.RecordReader)))) } //export FlightSQLStatementGetParameterSchema func FlightSQLStatementGetParameterSchema(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementGetParameterSchema", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementGetParameterSchema") if st == nil { return C.ADBC_STATUS_INVALID_STATE } sc, e := st.stmt.GetParameterSchema() if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } cdata.ExportArrowSchema(sc, toCdataSchema(schema)) return C.ADBC_STATUS_OK } //export FlightSQLStatementSetOption func FlightSQLStatementSetOption(stmt *C.struct_AdbcStatement, key, value *C.cchar_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetOption", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementSetOption") if st == nil { return C.ADBC_STATUS_INVALID_STATE } return C.AdbcStatusCode(errToAdbcErr(err, st.stmt.SetOption(C.GoString(key), C.GoString(value)))) } //export FlightSQLStatementSetOptionBytes func FlightSQLStatementSetOptionBytes(db *C.struct_AdbcStatement, key *C.cchar_t, value *C.cuint8_t, length C.size_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetOptionBytes", e) } }() st := checkStmtInit(db, err, "AdbcStatementSetOptionBytes") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementSetOptionBytes: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionBytes(C.GoString(key), fromCArr[byte](value, int(length))))) } //export FlightSQLStatementSetOptionDouble func FlightSQLStatementSetOptionDouble(db *C.struct_AdbcStatement, key *C.cchar_t, value C.double, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetOptionDouble", e) } }() st := checkStmtInit(db, err, "AdbcStatementSetOptionDouble") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementSetOptionDouble: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionDouble(C.GoString(key), float64(value)))) } //export FlightSQLStatementSetOptionInt func FlightSQLStatementSetOptionInt(db *C.struct_AdbcStatement, key *C.cchar_t, value C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementSetOptionInt", e) } }() st := checkStmtInit(db, err, "AdbcStatementSetOptionInt") if st == nil { return C.ADBC_STATUS_INVALID_STATE } opts, ok := st.stmt.(adbc.GetSetOptions) if !ok { setErr(err, "AdbcStatementSetOptionInt: options are not supported") return C.ADBC_STATUS_NOT_IMPLEMENTED } return C.AdbcStatusCode(errToAdbcErr(err, opts.SetOptionInt(C.GoString(key), int64(value)))) } //export releasePartitions func releasePartitions(partitions *C.struct_AdbcPartitions) { if partitions.private_data == nil { return } C.free(unsafe.Pointer(partitions.partitions)) C.free(unsafe.Pointer(partitions.partition_lengths)) C.free(partitions.private_data) partitions.partitions = nil partitions.partition_lengths = nil partitions.private_data = nil } //export FlightSQLStatementExecutePartitions func FlightSQLStatementExecutePartitions(stmt *C.struct_AdbcStatement, schema *C.struct_ArrowSchema, partitions *C.struct_AdbcPartitions, affected *C.int64_t, err *C.struct_AdbcError) (code C.AdbcStatusCode) { defer func() { if e := recover(); e != nil { code = poison(err, "AdbcStatementExecutePartitions", e) } }() st := checkStmtInit(stmt, err, "AdbcStatementExecutePartitions") if st == nil { return C.ADBC_STATUS_INVALID_STATE } sc, part, n, e := st.stmt.ExecutePartitions(st.newContext()) if e != nil { return C.AdbcStatusCode(errToAdbcErr(err, e)) } if partitions == nil { setErr(err, "AdbcStatementExecutePartitions: partitions output struct is null") return C.ADBC_STATUS_INVALID_ARGUMENT } if affected != nil { *affected = C.int64_t(n) } if sc != nil && schema != nil { cdata.ExportArrowSchema(sc, toCdataSchema(schema)) } partitions.num_partitions = C.size_t(part.NumPartitions) partitions.partitions = (**C.cuint8_t)(C.malloc(C.size_t(unsafe.Sizeof((*C.uint8_t)(nil)) * uintptr(part.NumPartitions)))) partitions.partition_lengths = (*C.size_t)(C.malloc(C.size_t(unsafe.Sizeof(C.size_t(0)) * uintptr(part.NumPartitions)))) // Copy into C-allocated memory to avoid violating CGO rules totalLen := 0 for _, p := range part.PartitionIDs { totalLen += len(p) } partitions.private_data = C.calloc(C.size_t(totalLen), C.size_t(1)) dst := fromCArr[byte]((*byte)(partitions.private_data), totalLen) partIDs := fromCArr[*C.cuint8_t](partitions.partitions, int(partitions.num_partitions)) partLens := fromCArr[C.size_t](partitions.partition_lengths, int(partitions.num_partitions)) for i, p := range part.PartitionIDs { partIDs[i] = (*C.cuint8_t)(&dst[0]) copy(dst, p) dst = dst[len(p):] partLens[i] = C.size_t(len(p)) } partitions.release = (*[0]byte)(C.releasePartitions) return C.ADBC_STATUS_OK } //export FlightSQLDriverInit func FlightSQLDriverInit(version C.int, rawDriver *C.void, err *C.struct_AdbcError) C.AdbcStatusCode { driver := (*C.struct_AdbcDriver)(unsafe.Pointer(rawDriver)) switch version { case C.ADBC_VERSION_1_0_0: sink := fromCArr[byte]((*byte)(unsafe.Pointer(driver)), C.ADBC_DRIVER_1_0_0_SIZE) memory.Set(sink, 0) case C.ADBC_VERSION_1_1_0: sink := fromCArr[byte]((*byte)(unsafe.Pointer(driver)), C.ADBC_DRIVER_1_1_0_SIZE) memory.Set(sink, 0) default: setErr(err, "Only version 1.0.0/1.1.0 supported, got %d", int(version)) return C.ADBC_STATUS_NOT_IMPLEMENTED } driver.DatabaseInit = (*[0]byte)(C.FlightSQLDatabaseInit) driver.DatabaseNew = (*[0]byte)(C.FlightSQLDatabaseNew) driver.DatabaseRelease = (*[0]byte)(C.FlightSQLDatabaseRelease) driver.DatabaseSetOption = (*[0]byte)(C.FlightSQLDatabaseSetOption) driver.ConnectionNew = (*[0]byte)(C.FlightSQLConnectionNew) driver.ConnectionInit = (*[0]byte)(C.FlightSQLConnectionInit) driver.ConnectionRelease = (*[0]byte)(C.FlightSQLConnectionRelease) driver.ConnectionSetOption = (*[0]byte)(C.FlightSQLConnectionSetOption) driver.ConnectionGetInfo = (*[0]byte)(C.FlightSQLConnectionGetInfo) driver.ConnectionGetObjects = (*[0]byte)(C.FlightSQLConnectionGetObjects) driver.ConnectionGetTableSchema = (*[0]byte)(C.FlightSQLConnectionGetTableSchema) driver.ConnectionGetTableTypes = (*[0]byte)(C.FlightSQLConnectionGetTableTypes) driver.ConnectionReadPartition = (*[0]byte)(C.FlightSQLConnectionReadPartition) driver.ConnectionCommit = (*[0]byte)(C.FlightSQLConnectionCommit) driver.ConnectionRollback = (*[0]byte)(C.FlightSQLConnectionRollback) driver.StatementNew = (*[0]byte)(C.FlightSQLStatementNew) driver.StatementRelease = (*[0]byte)(C.FlightSQLStatementRelease) driver.StatementSetOption = (*[0]byte)(C.FlightSQLStatementSetOption) driver.StatementSetSqlQuery = (*[0]byte)(C.FlightSQLStatementSetSqlQuery) driver.StatementSetSubstraitPlan = (*[0]byte)(C.FlightSQLStatementSetSubstraitPlan) driver.StatementBind = (*[0]byte)(C.FlightSQLStatementBind) driver.StatementBindStream = (*[0]byte)(C.FlightSQLStatementBindStream) driver.StatementExecuteQuery = (*[0]byte)(C.FlightSQLStatementExecuteQuery) driver.StatementExecutePartitions = (*[0]byte)(C.FlightSQLStatementExecutePartitions) driver.StatementGetParameterSchema = (*[0]byte)(C.FlightSQLStatementGetParameterSchema) driver.StatementPrepare = (*[0]byte)(C.FlightSQLStatementPrepare) if version == C.ADBC_VERSION_1_1_0 { driver.ErrorGetDetailCount = (*[0]byte)(C.FlightSQLErrorGetDetailCount) driver.ErrorGetDetail = (*[0]byte)(C.FlightSQLErrorGetDetail) driver.ErrorFromArrayStream = (*[0]byte)(C.FlightSQLErrorFromArrayStream) driver.DatabaseGetOption = (*[0]byte)(C.FlightSQLDatabaseGetOption) driver.DatabaseGetOptionBytes = (*[0]byte)(C.FlightSQLDatabaseGetOptionBytes) driver.DatabaseGetOptionDouble = (*[0]byte)(C.FlightSQLDatabaseGetOptionDouble) driver.DatabaseGetOptionInt = (*[0]byte)(C.FlightSQLDatabaseGetOptionInt) driver.DatabaseSetOptionBytes = (*[0]byte)(C.FlightSQLDatabaseSetOptionBytes) driver.DatabaseSetOptionDouble = (*[0]byte)(C.FlightSQLDatabaseSetOptionDouble) driver.DatabaseSetOptionInt = (*[0]byte)(C.FlightSQLDatabaseSetOptionInt) driver.ConnectionCancel = (*[0]byte)(C.FlightSQLConnectionCancel) driver.ConnectionGetOption = (*[0]byte)(C.FlightSQLConnectionGetOption) driver.ConnectionGetOptionBytes = (*[0]byte)(C.FlightSQLConnectionGetOptionBytes) driver.ConnectionGetOptionDouble = (*[0]byte)(C.FlightSQLConnectionGetOptionDouble) driver.ConnectionGetOptionInt = (*[0]byte)(C.FlightSQLConnectionGetOptionInt) driver.ConnectionGetStatistics = (*[0]byte)(C.FlightSQLConnectionGetStatistics) driver.ConnectionGetStatisticNames = (*[0]byte)(C.FlightSQLConnectionGetStatisticNames) driver.ConnectionSetOptionBytes = (*[0]byte)(C.FlightSQLConnectionSetOptionBytes) driver.ConnectionSetOptionDouble = (*[0]byte)(C.FlightSQLConnectionSetOptionDouble) driver.ConnectionSetOptionInt = (*[0]byte)(C.FlightSQLConnectionSetOptionInt) driver.StatementCancel = (*[0]byte)(C.FlightSQLStatementCancel) driver.StatementExecuteSchema = (*[0]byte)(C.FlightSQLStatementExecuteSchema) driver.StatementGetOption = (*[0]byte)(C.FlightSQLStatementGetOption) driver.StatementGetOptionBytes = (*[0]byte)(C.FlightSQLStatementGetOptionBytes) driver.StatementGetOptionDouble = (*[0]byte)(C.FlightSQLStatementGetOptionDouble) driver.StatementGetOptionInt = (*[0]byte)(C.FlightSQLStatementGetOptionInt) driver.StatementSetOptionBytes = (*[0]byte)(C.FlightSQLStatementSetOptionBytes) driver.StatementSetOptionDouble = (*[0]byte)(C.FlightSQLStatementSetOptionDouble) driver.StatementSetOptionInt = (*[0]byte)(C.FlightSQLStatementSetOptionInt) } return C.ADBC_STATUS_OK } func main() {}