go/adbc/driver/flightsql/flightsql_driver.go (97 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package flightsql is an ADBC Driver Implementation for Flight SQL // natively in go. // // It can be used to register a driver for database/sql by importing // github.com/apache/arrow-adbc/go/adbc/sqldriver and running: // // sql.Register("flightsql", sqldriver.Driver{flightsql.Driver{}}) // // You can then open a flightsql connection with the database/sql // standard package by using: // // db, err := sql.Open("flightsql", "uri=<flight sql db url>") // // The URI passed *must* contain a scheme, most likely "grpc+tcp://" package flightsql import ( "net/url" "time" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase" "github.com/apache/arrow-go/v18/arrow/memory" "golang.org/x/exp/maps" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) const ( OptionAuthority = "adbc.flight.sql.client_option.authority" OptionMTLSCertChain = "adbc.flight.sql.client_option.mtls_cert_chain" OptionMTLSPrivateKey = "adbc.flight.sql.client_option.mtls_private_key" OptionSSLOverrideHostname = "adbc.flight.sql.client_option.tls_override_hostname" OptionSSLSkipVerify = "adbc.flight.sql.client_option.tls_skip_verify" OptionSSLRootCerts = "adbc.flight.sql.client_option.tls_root_certs" OptionWithBlock = "adbc.flight.sql.client_option.with_block" OptionWithMaxMsgSize = "adbc.flight.sql.client_option.with_max_msg_size" OptionAuthorizationHeader = "adbc.flight.sql.authorization_header" OptionTimeoutConnect = "adbc.flight.sql.rpc.timeout_seconds.connect" OptionTimeoutFetch = "adbc.flight.sql.rpc.timeout_seconds.fetch" OptionTimeoutQuery = "adbc.flight.sql.rpc.timeout_seconds.query" OptionTimeoutUpdate = "adbc.flight.sql.rpc.timeout_seconds.update" OptionRPCCallHeaderPrefix = "adbc.flight.sql.rpc.call_header." OptionCookieMiddleware = "adbc.flight.sql.rpc.with_cookie_middleware" OptionSessionOptions = "adbc.flight.sql.session.options" OptionSessionOptionPrefix = "adbc.flight.sql.session.option." OptionEraseSessionOptionPrefix = "adbc.flight.sql.session.optionerase." OptionBoolSessionOptionPrefix = "adbc.flight.sql.session.optionbool." OptionStringListSessionOptionPrefix = "adbc.flight.sql.session.optionstringlist." OptionLastFlightInfo = "adbc.flight.sql.statement.exec.last_flight_info" infoDriverName = "ADBC Flight SQL Driver - Go" // Oauth2 options OptionKeyOauthFlow = "adbc.flight.sql.oauth.flow" OptionKeyAuthURI = "adbc.flight.sql.oauth.auth_uri" OptionKeyTokenURI = "adbc.flight.sql.oauth.token_uri" OptionKeyRedirectURI = "adbc.flight.sql.oauth.redirect_uri" OptionKeyScope = "adbc.flight.sql.oauth.scope" OptionKeyClientId = "adbc.flight.sql.oauth.client_id" OptionKeyClientSecret = "adbc.flight.sql.oauth.client_secret" OptionKeySubjectToken = "adbc.flight.sql.oauth.exchange.subject_token" OptionKeySubjectTokenType = "adbc.flight.sql.oauth.exchange.subject_token_type" OptionKeyActorToken = "adbc.flight.sql.oauth.exchange.actor_token" OptionKeyActorTokenType = "adbc.flight.sql.oauth.exchange.actor_token_type" OptionKeyReqTokenType = "adbc.flight.sql.oauth.exchange.requested_token_type" OptionKeyExchangeScope = "adbc.flight.sql.oauth.exchange.scope" OptionKeyExchangeAud = "adbc.flight.sql.oauth.exchange.aud" OptionKeyExchangeResource = "adbc.flight.sql.oauth.exchange.resource" ) var errNoTransactionSupport = adbc.Error{ Msg: "[Flight SQL] server does not report transaction support", Code: adbc.StatusNotImplemented, } type driverImpl struct { driverbase.DriverImplBase } // Driver is the extended [adbc.Driver] interface for Flight SQL. // // It adds an additional method to create a database with grpc specific options that cannot be // passed through the options map. type Driver interface { adbc.Driver NewDatabaseWithOptions(map[string]string, ...grpc.DialOption) (adbc.Database, error) } // NewDriver creates a new Flight SQL driver using the given Arrow allocator. func NewDriver(alloc memory.Allocator) Driver { info := driverbase.DefaultDriverInfo("Flight SQL") return &driverImpl{DriverImplBase: driverbase.NewDriverImplBase(info, alloc)} } // NewDatabase creates a new Flight SQL database using the given options. // // Additional grpc client options can can be passed as grpc.DialOption. // This enables the use of additional grpc client options not directly exposed by the options map. // such as grpc.WithStatsHandler() for enabling various telemetry handlers. func (d *driverImpl) NewDatabaseWithOptions(opts map[string]string, userDialOpts ...grpc.DialOption) (adbc.Database, error) { opts = maps.Clone(opts) uri, ok := opts[adbc.OptionKeyURI] if !ok { return nil, adbc.Error{ Msg: "URI required for a FlightSQL DB", Code: adbc.StatusInvalidArgument, } } delete(opts, adbc.OptionKeyURI) db := &databaseImpl{ DatabaseImplBase: driverbase.NewDatabaseImplBase(&d.DriverImplBase), timeout: timeoutOption{ // Match gRPC default connectTimeout: time.Second * 20, }, hdrs: make(metadata.MD), userDialOpts: userDialOpts, } var err error if db.uri, err = url.Parse(uri); err != nil { return nil, adbc.Error{Msg: err.Error(), Code: adbc.StatusInvalidArgument} } // Use WithMaxMsgSize(16 MiB) since Flight services tend to send large messages db.dialOpts.maxMsgSize = 16 * 1024 * 1024 db.options = make(map[string]string) if err := db.SetOptions(opts); err != nil { return nil, err } return driverbase.NewDatabase(db), nil } // NewDatabase creates a new Flight SQL database using the given options. func (d *driverImpl) NewDatabase(opts map[string]string) (adbc.Database, error) { return d.NewDatabaseWithOptions(opts) }