in go/adbc/driver/flightsql/flightsql_database.go [74:274]
func (d *databaseImpl) SetOptions(cnOptions map[string]string) error {
var tlsConfig tls.Config
for k, v := range cnOptions {
d.options[k] = v
}
if authority, ok := cnOptions[OptionAuthority]; ok {
d.dialOpts.authority = authority
delete(cnOptions, OptionAuthority)
}
mtlsCert := cnOptions[OptionMTLSCertChain]
mtlsKey := cnOptions[OptionMTLSPrivateKey]
switch {
case mtlsCert != "" && mtlsKey != "":
cert, err := tls.X509KeyPair([]byte(mtlsCert), []byte(mtlsKey))
if err != nil {
return adbc.Error{
Msg: fmt.Sprintf("Invalid mTLS certificate: %#v", err),
Code: adbc.StatusInvalidArgument,
}
}
tlsConfig.Certificates = []tls.Certificate{cert}
delete(cnOptions, OptionMTLSCertChain)
delete(cnOptions, OptionMTLSPrivateKey)
case mtlsCert != "":
return adbc.Error{
Msg: fmt.Sprintf("Must provide both '%s' and '%s', only provided '%s'", OptionMTLSCertChain, OptionMTLSPrivateKey, OptionMTLSCertChain),
Code: adbc.StatusInvalidArgument,
}
case mtlsKey != "":
return adbc.Error{
Msg: fmt.Sprintf("Must provide both '%s' and '%s', only provided '%s'", OptionMTLSCertChain, OptionMTLSPrivateKey, OptionMTLSPrivateKey),
Code: adbc.StatusInvalidArgument,
}
}
if hostname, ok := cnOptions[OptionSSLOverrideHostname]; ok {
tlsConfig.ServerName = hostname
delete(cnOptions, OptionSSLOverrideHostname)
}
if val, ok := cnOptions[OptionSSLSkipVerify]; ok {
switch val {
case adbc.OptionValueEnabled:
tlsConfig.InsecureSkipVerify = true
case adbc.OptionValueDisabled:
tlsConfig.InsecureSkipVerify = false
default:
return adbc.Error{
Msg: fmt.Sprintf("Invalid value for database option '%s': '%s'", OptionSSLSkipVerify, val),
Code: adbc.StatusInvalidArgument,
}
}
delete(cnOptions, OptionSSLSkipVerify)
}
if cert, ok := cnOptions[OptionSSLRootCerts]; ok {
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM([]byte(cert)) {
return adbc.Error{
Msg: fmt.Sprintf("Invalid value for database option '%s': failed to append certificates", OptionSSLRootCerts),
Code: adbc.StatusInvalidArgument,
}
}
tlsConfig.RootCAs = cp
delete(cnOptions, OptionSSLRootCerts)
}
d.creds = credentials.NewTLS(&tlsConfig)
if auth, ok := cnOptions[OptionAuthorizationHeader]; ok {
d.hdrs.Set("authorization", auth)
delete(cnOptions, OptionAuthorizationHeader)
}
const authConflictError = "Authentication conflict: Use either Authorization header OR username/password parameter"
if u, ok := cnOptions[adbc.OptionKeyUsername]; ok {
if d.hdrs.Len() > 0 {
return adbc.Error{
Msg: authConflictError,
Code: adbc.StatusInvalidArgument,
}
}
d.user = u
delete(cnOptions, adbc.OptionKeyUsername)
}
if p, ok := cnOptions[adbc.OptionKeyPassword]; ok {
if d.hdrs.Len() > 0 {
return adbc.Error{
Msg: authConflictError,
Code: adbc.StatusInvalidArgument,
}
}
d.pass = p
delete(cnOptions, adbc.OptionKeyPassword)
}
if flow, ok := cnOptions[OptionKeyOauthFlow]; ok {
if d.hdrs.Len() > 0 {
return adbc.Error{
Msg: authConflictError,
Code: adbc.StatusInvalidArgument,
}
}
var err error
switch flow {
case ClientCredentials:
d.oauthToken, err = newClientCredentials(cnOptions)
case TokenExchange:
d.oauthToken, err = newTokenExchangeFlow(cnOptions)
default:
return adbc.Error{
Msg: fmt.Sprintf("oauth flow not implemented: %s", flow),
Code: adbc.StatusNotImplemented,
}
}
if err != nil {
return err
}
delete(cnOptions, OptionKeyOauthFlow)
}
var err error
if tv, ok := cnOptions[OptionTimeoutFetch]; ok {
if err = d.timeout.setTimeoutString(OptionTimeoutFetch, tv); err != nil {
return err
}
delete(cnOptions, OptionTimeoutFetch)
}
if tv, ok := cnOptions[OptionTimeoutQuery]; ok {
if err = d.timeout.setTimeoutString(OptionTimeoutQuery, tv); err != nil {
return err
}
delete(cnOptions, OptionTimeoutQuery)
}
if tv, ok := cnOptions[OptionTimeoutUpdate]; ok {
if err = d.timeout.setTimeoutString(OptionTimeoutUpdate, tv); err != nil {
return err
}
delete(cnOptions, OptionTimeoutUpdate)
}
if tv, ok := cnOptions[OptionTimeoutConnect]; ok {
if err = d.timeout.setTimeoutString(OptionTimeoutConnect, tv); err != nil {
return err
}
delete(cnOptions, OptionTimeoutConnect)
}
// gRPC deprecated this and explicitly recommends against it
delete(cnOptions, OptionWithBlock)
if val, ok := cnOptions[OptionWithMaxMsgSize]; ok {
var err error
var size int
if size, err = strconv.Atoi(val); err != nil {
return adbc.Error{
Msg: fmt.Sprintf("Invalid value for database option '%s': '%s' is not a positive integer", OptionWithMaxMsgSize, val),
Code: adbc.StatusInvalidArgument,
}
} else if size <= 0 {
return adbc.Error{
Msg: fmt.Sprintf("Invalid value for database option '%s': '%s' is not a positive integer", OptionWithMaxMsgSize, val),
Code: adbc.StatusInvalidArgument,
}
}
d.dialOpts.maxMsgSize = size
delete(cnOptions, OptionWithMaxMsgSize)
}
d.dialOpts.rebuild()
if val, ok := cnOptions[OptionCookieMiddleware]; ok {
switch val {
case adbc.OptionValueEnabled:
d.enableCookies = true
case adbc.OptionValueDisabled:
d.enableCookies = false
default:
return d.ErrorHelper.Errorf(adbc.StatusInvalidArgument, "Invalid value for database option '%s': '%s'", OptionCookieMiddleware, val)
}
delete(cnOptions, OptionCookieMiddleware)
}
for key, val := range cnOptions {
if strings.HasPrefix(key, OptionRPCCallHeaderPrefix) {
d.hdrs.Append(strings.TrimPrefix(key, OptionRPCCallHeaderPrefix), val)
continue
}
return d.ErrorHelper.Errorf(adbc.StatusInvalidArgument, "[Flight SQL] Unknown database option '%s'", key)
}
return nil
}