go/adbc/driver/bigquery/connection.go (619 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package bigquery
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"regexp"
"strconv"
"time"
"cloud.google.com/go/bigquery"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/driver/internal"
"github.com/apache/arrow-adbc/go/adbc/driver/internal/driverbase"
"github.com/apache/arrow-go/v18/arrow"
"golang.org/x/oauth2"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
type connectionImpl struct {
driverbase.ConnectionImplBase
authType string
credentials string
clientID string
clientSecret string
refreshToken string
// catalog is the same as the project id in BigQuery
catalog string
// dbSchema is the same as the dataset id in BigQuery
dbSchema string
// tableID is the default table for statement
tableID string
resultRecordBufferSize int
prefetchConcurrency int
client *bigquery.Client
}
func (c *connectionImpl) GetCatalogs(ctx context.Context, catalogFilter *string) ([]string, error) {
catalogPattern, err := internal.PatternToRegexp(catalogFilter)
if err != nil {
return nil, err
}
if catalogPattern == nil {
catalogPattern = internal.AcceptAll
}
// Connections to BQ are scoped to a particular Project, which corresponds to catalog-level namespacing.
// TODO: Consider enumerating projects with ResourceManager API, but this may not be "idiomatic" usage.
project := c.client.Project()
res := make([]string, 0)
if catalogPattern.MatchString(project) {
res = append(res, project)
}
return res, nil
}
func (c *connectionImpl) GetDBSchemasForCatalog(ctx context.Context, catalog string, schemaFilter *string) ([]string, error) {
schemaPattern, err := internal.PatternToRegexp(schemaFilter)
if err != nil {
return nil, err
}
if schemaPattern == nil {
schemaPattern = internal.AcceptAll
}
it := c.client.Datasets(ctx)
it.ProjectID = catalog
res := make([]string, 0)
for {
ds, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
if schemaPattern.MatchString(ds.DatasetID) {
res = append(res, ds.DatasetID)
}
}
return res, nil
}
func (c *connectionImpl) GetTablesForDBSchema(ctx context.Context, catalog string, schema string, tableFilter *string, columnFilter *string, includeColumns bool) ([]driverbase.TableInfo, error) {
tablePattern, err := internal.PatternToRegexp(tableFilter)
if err != nil {
return nil, err
}
if tablePattern == nil {
tablePattern = internal.AcceptAll
}
it := c.client.DatasetInProject(catalog, schema).Tables(ctx)
res := make([]driverbase.TableInfo, 0)
for {
table, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
if !tablePattern.MatchString(table.TableID) {
continue
}
md, err := table.Metadata(ctx, bigquery.WithMetadataView(bigquery.BasicMetadataView))
if err != nil {
return nil, err
}
var constraints []driverbase.ConstraintInfo
if md.TableConstraints != nil {
constraints = make([]driverbase.ConstraintInfo, 0)
if md.TableConstraints.PrimaryKey != nil {
constraints = append(constraints, driverbase.ConstraintInfo{
// BigQuery Primary Keys are unnamed
ConstraintType: internal.PrimaryKey,
ConstraintColumnNames: driverbase.RequiredList(md.TableConstraints.PrimaryKey.Columns),
})
}
for _, fk := range md.TableConstraints.ForeignKeys {
var columnUsage []driverbase.ConstraintColumnUsage
if len(fk.ColumnReferences) > 0 {
columnUsage = make([]driverbase.ConstraintColumnUsage, len(fk.ColumnReferences))
}
for i, ref := range fk.ColumnReferences {
columnUsage[i] = driverbase.ConstraintColumnUsage{
ForeignKeyCatalog: driverbase.Nullable(fk.ReferencedTable.ProjectID),
ForeignKeyDbSchema: driverbase.Nullable(fk.ReferencedTable.DatasetID),
ForeignKeyTable: fk.ReferencedTable.TableID,
ForeignKeyColumn: ref.ReferencedColumn,
}
}
constraints = append(constraints, driverbase.ConstraintInfo{
ConstraintName: driverbase.Nullable(fk.Name),
ConstraintType: internal.ForeignKey,
ConstraintColumnUsage: columnUsage,
})
}
}
var columns []driverbase.ColumnInfo
if includeColumns {
columnPattern, err := internal.PatternToRegexp(columnFilter)
if err != nil {
return nil, err
}
if columnPattern == nil {
columnPattern = internal.AcceptAll
}
columns = make([]driverbase.ColumnInfo, 0)
for pos, fieldschema := range md.Schema {
if columnPattern.MatchString(fieldschema.Name) {
xdbcIsNullable := "YES"
xdbcNullable := int16(1)
if fieldschema.Required {
xdbcIsNullable = "NO"
xdbcNullable = 0
}
xdbcColumnSize := fieldschema.MaxLength
if xdbcColumnSize == 0 {
xdbcColumnSize = fieldschema.Precision
}
var xdbcCharOctetLength int32
if fieldschema.Type == bigquery.BytesFieldType {
xdbcCharOctetLength = int32(fieldschema.MaxLength)
}
field, err := buildField(fieldschema, 0)
if err != nil {
return nil, err
}
xdbcDataType := internal.ToXdbcDataType(field.Type)
columns = append(columns, driverbase.ColumnInfo{
ColumnName: fieldschema.Name,
OrdinalPosition: driverbase.Nullable(int32(pos + 1)),
Remarks: driverbase.Nullable(fieldschema.Description),
XdbcDataType: driverbase.Nullable(int16(field.Type.ID())),
XdbcTypeName: driverbase.Nullable(string(fieldschema.Type)),
XdbcNullable: driverbase.Nullable(xdbcNullable),
XdbcSqlDataType: driverbase.Nullable(int16(xdbcDataType)),
XdbcIsNullable: driverbase.Nullable(xdbcIsNullable),
XdbcDecimalDigits: driverbase.Nullable(int16(fieldschema.Scale)),
XdbcColumnSize: driverbase.Nullable(int32(xdbcColumnSize)),
XdbcCharOctetLength: driverbase.Nullable(xdbcCharOctetLength),
XdbcScopeCatalog: driverbase.Nullable(catalog),
XdbcScopeSchema: driverbase.Nullable(schema),
XdbcScopeTable: driverbase.Nullable(table.TableID),
})
}
}
}
res = append(res, driverbase.TableInfo{
TableName: table.TableID,
TableType: string(md.Type),
TableConstraints: constraints,
TableColumns: columns,
})
}
return res, nil
}
type bigQueryTokenResponse struct {
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
Scope string `json:"scope"`
TokenType string `json:"token_type"`
}
// GetCurrentCatalog implements driverbase.CurrentNamespacer.
func (c *connectionImpl) GetCurrentCatalog() (string, error) {
return c.catalog, nil
}
// GetCurrentDbSchema implements driverbase.CurrentNamespacer.
func (c *connectionImpl) GetCurrentDbSchema() (string, error) {
return c.dbSchema, nil
}
// SetCurrentCatalog implements driverbase.CurrentNamespacer.
func (c *connectionImpl) SetCurrentCatalog(value string) error {
c.catalog = value
return nil
}
// SetCurrentDbSchema implements driverbase.CurrentNamespacer.
func (c *connectionImpl) SetCurrentDbSchema(value string) error {
sanitized, err := sanitizeDataset(value)
if err != nil {
return err
}
c.dbSchema = sanitized
return nil
}
// ListTableTypes implements driverbase.TableTypeLister.
func (c *connectionImpl) ListTableTypes(ctx context.Context) ([]string, error) {
return []string{
string(bigquery.RegularTable),
string(bigquery.ViewTable),
string(bigquery.ExternalTable),
string(bigquery.MaterializedView),
string(bigquery.Snapshot),
}, nil
}
// SetAutocommit implements driverbase.AutocommitSetter.
func (c *connectionImpl) SetAutocommit(enabled bool) error {
if enabled {
return nil
}
return adbc.Error{
Code: adbc.StatusNotImplemented,
Msg: "SetAutocommit to `false` is not yet implemented",
}
}
// Commit commits any pending transactions on this connection, it should
// only be used if autocommit is disabled.
//
// Behavior is undefined if this is mixed with SQL transaction statements.
func (c *connectionImpl) Commit(_ context.Context) error {
return adbc.Error{
Code: adbc.StatusNotImplemented,
Msg: "Commit not yet implemented for BigQuery driver",
}
}
// Rollback rolls back any pending transactions. Only used if autocommit
// is disabled.
//
// Behavior is undefined if this is mixed with SQL transaction statements.
func (c *connectionImpl) Rollback(_ context.Context) error {
return adbc.Error{
Code: adbc.StatusNotImplemented,
Msg: "Rollback not yet implemented for BigQuery driver",
}
}
// Close closes this connection and releases any associated resources.
func (c *connectionImpl) Close() error {
return c.client.Close()
}
// Metadata methods
// Generally these methods return an array.RecordReader that
// can be consumed to retrieve metadata about the database as Arrow
// data. The returned metadata has an expected schema given in the
// doc strings of the specific methods. Schema fields are nullable
// unless otherwise marked. While no Statement is used in these
// methods, the result set may count as an active statement to the
// driver for the purposes of concurrency management (e.g. if the
// driver has a limit on concurrent active statements and it must
// execute a SQL query internally in order to implement the metadata
// method).
//
// Some methods accept "search pattern" arguments, which are strings
// that can contain the special character "%" to match zero or more
// characters, or "_" to match exactly one character. (See the
// documentation of DatabaseMetaData in JDBC or "Pattern Value Arguments"
// in the ODBC documentation.) Escaping is not currently supported.
// GetObjects gets a hierarchical view of all catalogs, database schemas,
// tables, and columns.
//
// The result is an Arrow Dataset with the following schema:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// catalog_name | utf8
// catalog_db_schemas | list<DB_SCHEMA_SCHEMA>
//
// DB_SCHEMA_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// db_schema_name | utf8
// db_schema_tables | list<TABLE_SCHEMA>
//
// TABLE_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// table_name | utf8 not null
// table_type | utf8 not null
// table_columns | list<COLUMN_SCHEMA>
// table_constraints | list<CONSTRAINT_SCHEMA>
//
// COLUMN_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type | Comments
// ----------------------------|---------------------|---------
// column_name | utf8 not null |
// ordinal_position | int32 | (1)
// remarks | utf8 | (2)
// xdbc_data_type | int16 | (3)
// xdbc_type_name | utf8 | (3)
// xdbc_column_size | int32 | (3)
// xdbc_decimal_digits | int16 | (3)
// xdbc_num_prec_radix | int16 | (3)
// xdbc_nullable | int16 | (3)
// xdbc_column_def | utf8 | (3)
// xdbc_sql_data_type | int16 | (3)
// xdbc_datetime_sub | int16 | (3)
// xdbc_char_octet_length | int32 | (3)
// xdbc_is_nullable | utf8 | (3)
// xdbc_scope_catalog | utf8 | (3)
// xdbc_scope_schema | utf8 | (3)
// xdbc_scope_table | utf8 | (3)
// xdbc_is_autoincrement | bool | (3)
// xdbc_is_generatedcolumn | utf8 | (3)
//
// 1. The column's ordinal position in the table (starting from 1).
// 2. Database-specific description of the column.
// 3. Optional Value. Should be null if not supported by the driver.
// xdbc_values are meant to provide JDBC/ODBC-compatible metadata
// in an agnostic manner.
//
// CONSTRAINT_SCHEMA is a Struct with the fields:
//
// Field Name | Field Type | Comments
// ----------------------------|---------------------|---------
// constraint_name | utf8 |
// constraint_type | utf8 not null | (1)
// constraint_column_names | list<utf8> not null | (2)
// constraint_column_usage | list<USAGE_SCHEMA> | (3)
//
// 1. One of 'CHECK', 'FOREIGN KEY', 'PRIMARY KEY', or 'UNIQUE'.
// 2. The columns on the current table that are constrained, in order.
// 3. For FOREIGN KEY only, the referenced table and columns.
//
// USAGE_SCHEMA is a Struct with fields:
//
// Field Name | Field Type
// ----------------------------|----------------------------
// fk_catalog | utf8
// fk_db_schema | utf8
// fk_table | utf8 not null
// fk_column_name | utf8 not null
//
// For the parameters: If nil is passed, then that parameter will not
// be filtered by at all. If an empty string, then only objects without
// that property (ie: catalog or db schema) will be returned.
//
// tableName and columnName must be either nil (do not filter by
// table name or column name) or non-empty.
//
// All non-empty, non-nil strings should be a search pattern (as described
// earlier).
func (c *connectionImpl) GetTableSchema(ctx context.Context, catalog *string, dbSchema *string, tableName string) (*arrow.Schema, error) {
return c.getTableSchemaWithFilter(ctx, catalog, dbSchema, tableName, nil)
}
// NewStatement initializes a new statement object tied to this connection
func (c *connectionImpl) NewStatement() (adbc.Statement, error) {
return &statement{
alloc: c.Alloc,
cnxn: c,
parameterMode: OptionValueQueryParameterModePositional,
resultRecordBufferSize: c.resultRecordBufferSize,
prefetchConcurrency: c.prefetchConcurrency,
queryConfig: bigquery.QueryConfig{
DefaultProjectID: c.catalog,
DefaultDatasetID: c.dbSchema,
},
}, nil
}
func (c *connectionImpl) GetOption(key string) (string, error) {
switch key {
case OptionStringAuthType:
return c.authType, nil
case OptionStringAuthCredentials:
return c.credentials, nil
case OptionStringAuthClientID:
return c.clientID, nil
case OptionStringAuthClientSecret:
return c.clientSecret, nil
case OptionStringAuthRefreshToken:
return c.refreshToken, nil
case OptionStringProjectID:
return c.catalog, nil
case OptionStringDatasetID:
return c.dbSchema, nil
case OptionStringTableID:
return c.tableID, nil
default:
return c.ConnectionImplBase.GetOption(key)
}
}
func (c *connectionImpl) GetOptionInt(key string) (int64, error) {
switch key {
case OptionIntQueryResultBufferSize:
return int64(c.resultRecordBufferSize), nil
case OptionIntQueryPrefetchConcurrency:
return int64(c.prefetchConcurrency), nil
default:
return c.ConnectionImplBase.GetOptionInt(key)
}
}
func (c *connectionImpl) SetOptionInt(key string, value int64) error {
switch key {
case OptionIntQueryResultBufferSize:
c.resultRecordBufferSize = int(value)
return nil
case OptionIntQueryPrefetchConcurrency:
c.prefetchConcurrency = int(value)
return nil
default:
return c.ConnectionImplBase.SetOptionInt(key, value)
}
}
func (c *connectionImpl) newClient(ctx context.Context) error {
if c.catalog == "" {
return adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "ProjectID is empty",
}
}
switch c.authType {
case OptionValueAuthTypeJSONCredentialFile, OptionValueAuthTypeJSONCredentialString, OptionValueAuthTypeUserAuthentication:
var credentials option.ClientOption
switch c.authType {
case OptionValueAuthTypeJSONCredentialFile:
credentials = option.WithCredentialsFile(c.credentials)
case OptionValueAuthTypeJSONCredentialString:
credentials = option.WithCredentialsJSON([]byte(c.credentials))
default:
if c.clientID == "" {
return adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("The `%s` parameter is empty", OptionStringAuthClientID),
}
}
if c.clientSecret == "" {
return adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("The `%s` parameter is empty", OptionStringAuthClientSecret),
}
}
if c.refreshToken == "" {
return adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("The `%s` parameter is empty", OptionStringAuthRefreshToken),
}
}
credentials = option.WithTokenSource(c)
}
client, err := bigquery.NewClient(ctx, c.catalog, credentials)
if err != nil {
return err
}
err = client.EnableStorageReadClient(ctx, credentials)
if err != nil {
return err
}
c.client = client
default:
client, err := bigquery.NewClient(ctx, c.catalog)
if err != nil {
return err
}
err = client.EnableStorageReadClient(ctx)
if err != nil {
return err
}
c.client = client
}
return nil
}
var (
// Dataset:
//
// https://cloud.google.com/bigquery/docs/datasets#dataset-naming
//
// When you create a dataset in BigQuery, the dataset name must be unique for each project.
// The dataset name can contain the following:
// - Up to 1,024 characters.
// - Letters (uppercase or lowercase), numbers, and underscores.
// Dataset names are case-sensitive by default. mydataset and MyDataset can coexist in the same project,
// unless one of them has case-sensitivity turned off.
// Dataset names cannot contain spaces or special characters such as -, &, @, or %.
datasetRegex = regexp.MustCompile("^[a-zA-Z0-9_-]")
)
func sanitizeDataset(value string) (string, error) {
if value == "" {
return value, nil
}
if datasetRegex.MatchString(value) {
if len(value) > 1024 {
return "", adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: "Dataset name exceeds 1024 characters",
}
}
return value, nil
}
return "", adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("invalid characters in value `%s`", value),
}
}
func (c *connectionImpl) getTableSchemaWithFilter(ctx context.Context, catalog *string, dbSchema *string, tableName string, columnName *string) (*arrow.Schema, error) {
if catalog == nil {
catalog = &c.catalog
}
if dbSchema == nil {
dbSchema = &c.dbSchema
}
md, err := c.client.DatasetInProject(*catalog, *dbSchema).Table(tableName).Metadata(ctx)
if err != nil {
return nil, err
}
metadata := make(map[string]string)
metadata["Name"] = md.Name
metadata["Location"] = md.Location
metadata["Description"] = md.Description
if md.MaterializedView != nil {
metadata["MaterializedView.EnableRefresh"] = strconv.FormatBool(md.MaterializedView.EnableRefresh)
metadata["MaterializedView.LastRefreshTime"] = md.MaterializedView.LastRefreshTime.Format(time.RFC3339Nano)
metadata["MaterializedView.Query"] = md.MaterializedView.Query
metadata["MaterializedView.RefreshInterval"] = md.MaterializedView.RefreshInterval.String()
metadata["MaterializedView.AllowNonIncrementalDefinition"] = strconv.FormatBool(md.MaterializedView.AllowNonIncrementalDefinition)
if md.MaxStaleness != nil {
metadata["MaterializedView.MaxStaleness"] = md.MaxStaleness.String()
}
}
if md.TimePartitioning != nil {
// "DAY", "HOUR", "MONTH", "YEAR"
metadata["TimePartitioning.Type"] = string(md.TimePartitioning.Type)
if md.TimePartitioning.Expiration != 0 {
metadata["TimePartitioning.Expiration"] = md.TimePartitioning.Expiration.String()
}
if md.TimePartitioning.Field != "" {
metadata["TimePartitioning.Field"] = md.TimePartitioning.Field
}
}
if md.RangePartitioning != nil {
if md.RangePartitioning.Field != "" {
metadata["RangePartitioning.Field"] = md.RangePartitioning.Field
}
if md.RangePartitioning.Range != nil {
metadata["RangePartitioning.Range.Start"] = strconv.FormatInt(md.RangePartitioning.Range.Start, 10)
metadata["RangePartitioning.Range.End"] = strconv.FormatInt(md.RangePartitioning.Range.End, 10)
metadata["RangePartitioning.Range.Interval"] = strconv.FormatInt(md.RangePartitioning.Range.Interval, 10)
}
}
if md.RequirePartitionFilter {
metadata["RequirePartitionFilter"] = strconv.FormatBool(md.RequirePartitionFilter)
}
labels := ""
if len(md.Labels) > 0 {
encodedLabel, err := json.Marshal(md.Labels)
if err == nil {
labels = string(encodedLabel)
}
}
metadata["Labels"] = labels
metadata["FullID"] = md.FullID
metadata["Type"] = string(md.Type)
metadata["CreationTime"] = md.CreationTime.Format(time.RFC3339Nano)
metadata["LastModifiedTime"] = md.LastModifiedTime.Format(time.RFC3339Nano)
metadata["NumBytes"] = strconv.FormatInt(md.NumBytes, 10)
metadata["NumLongTermBytes"] = strconv.FormatInt(md.NumLongTermBytes, 10)
metadata["NumRows"] = strconv.FormatUint(md.NumRows, 10)
if md.SnapshotDefinition != nil {
metadata["SnapshotDefinition.BaseTableReference"] = md.SnapshotDefinition.BaseTableReference.FullyQualifiedName()
metadata["SnapshotDefinition.SnapshotTime"] = md.SnapshotDefinition.SnapshotTime.Format(time.RFC3339Nano)
}
if md.CloneDefinition != nil {
metadata["CloneDefinition.BaseTableReference"] = md.CloneDefinition.BaseTableReference.FullyQualifiedName()
metadata["CloneDefinition.CloneTime"] = md.CloneDefinition.CloneTime.Format(time.RFC3339Nano)
}
metadata["ETag"] = md.ETag
metadata["DefaultCollation"] = md.DefaultCollation
tableMetadata := arrow.MetadataFrom(metadata)
fields := make([]arrow.Field, len(md.Schema))
for i, schema := range md.Schema {
f, err := buildField(schema, 0)
if err != nil {
return nil, err
}
fields[i] = f
}
schema := arrow.NewSchema(fields, &tableMetadata)
return schema, nil
}
func buildField(schema *bigquery.FieldSchema, level uint) (arrow.Field, error) {
field := arrow.Field{Name: schema.Name}
metadata := make(map[string]string)
metadata["Description"] = schema.Description
metadata["Repeated"] = strconv.FormatBool(schema.Repeated)
metadata["Required"] = strconv.FormatBool(schema.Required)
field.Nullable = !schema.Required
metadata["Type"] = string(schema.Type)
if schema.PolicyTags != nil {
policyTagList, err := json.Marshal(schema.PolicyTags)
if err != nil {
return arrow.Field{}, err
}
metadata["PolicyTags"] = string(policyTagList)
}
// https://cloud.google.com/bigquery/docs/reference/storage#arrow_schema_details
switch schema.Type {
case bigquery.StringFieldType:
metadata["MaxLength"] = strconv.FormatInt(schema.MaxLength, 10)
metadata["Collation"] = schema.Collation
field.Type = arrow.BinaryTypes.String
case bigquery.BytesFieldType:
metadata["MaxLength"] = strconv.FormatInt(schema.MaxLength, 10)
field.Type = arrow.BinaryTypes.Binary
case bigquery.IntegerFieldType:
field.Type = arrow.PrimitiveTypes.Int64
case bigquery.FloatFieldType:
field.Type = arrow.PrimitiveTypes.Float64
case bigquery.BooleanFieldType:
field.Type = arrow.FixedWidthTypes.Boolean
case bigquery.TimestampFieldType:
field.Type = arrow.FixedWidthTypes.Timestamp_ms
case bigquery.RecordFieldType:
if schema.Repeated {
if len(schema.Schema) == 1 {
arrayField, err := buildField(schema.Schema[0], level+1)
if err != nil {
return arrow.Field{}, err
}
field.Type = arrow.ListOf(arrayField.Type)
field.Metadata = arrayField.Metadata
field.Nullable = arrayField.Nullable
} else {
return arrow.Field{}, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("Cannot create array schema for filed `%s`: len(schema.Schema) != 1", schema.Name),
}
}
} else {
nestedFields := make([]arrow.Field, len(schema.Schema))
for i, nestedSchema := range schema.Schema {
f, err := buildField(nestedSchema, level+1)
if err != nil {
return arrow.Field{}, err
}
nestedFields[i] = f
}
structType := arrow.StructOf(nestedFields...)
if structType == nil {
return arrow.Field{}, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("Cannot create a struct schema for record `%s`", schema.Name),
}
}
field.Type = structType
}
case bigquery.DateFieldType:
field.Type = arrow.FixedWidthTypes.Date32
case bigquery.TimeFieldType:
field.Type = arrow.FixedWidthTypes.Time64us
case bigquery.DateTimeFieldType:
field.Type = arrow.FixedWidthTypes.Timestamp_us
case bigquery.NumericFieldType:
field.Type = &arrow.Decimal128Type{
Precision: int32(schema.Precision),
Scale: int32(schema.Scale),
}
case bigquery.GeographyFieldType:
// TODO: potentially we should consider using GeoArrow for this
field.Type = arrow.BinaryTypes.String
case bigquery.BigNumericFieldType:
field.Type = &arrow.Decimal256Type{
Precision: int32(schema.Precision),
Scale: int32(schema.Scale),
}
case bigquery.JSONFieldType:
field.Type = arrow.BinaryTypes.String
default:
// TODO: unsupported ones are:
// - bigquery.IntervalFieldType
// - bigquery.RangeFieldType
return arrow.Field{}, adbc.Error{
Code: adbc.StatusInvalidArgument,
Msg: fmt.Sprintf("Google SQL type `%s` is not supported yet", schema.Type),
}
}
if level == 0 {
metadata["DefaultValueExpression"] = schema.DefaultValueExpression
}
field.Metadata = arrow.MetadataFrom(metadata)
return field, nil
}
func (c *connectionImpl) Token() (*oauth2.Token, error) {
token, err := c.getAccessToken()
if err != nil {
return nil, err
}
now := time.Now()
return &oauth2.Token{
AccessToken: token.AccessToken,
TokenType: "Bearer",
RefreshToken: c.refreshToken,
Expiry: now.Add(time.Second * time.Duration(token.ExpiresIn)),
}, nil
}
func (c *connectionImpl) getAccessToken() (*bigQueryTokenResponse, error) {
params := url.Values{}
params.Add("grant_type", "refresh_token")
params.Add("client_id", c.clientID)
params.Add("client_secret", c.clientSecret)
params.Add("refresh_token", c.refreshToken)
req, err := http.NewRequest("POST", AccessTokenEndpoint, bytes.NewBufferString(params.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/json")
tr := &http.Transport{
TLSClientConfig: &tls.Config{ServerName: AccessTokenServerName},
}
client := &http.Client{
Transport: tr,
}
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
defer func(Body io.ReadCloser) {
bodyErr := Body.Close()
if bodyErr != nil {
err = bodyErr
}
}(resp.Body)
contents, err := io.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
var tokenResponse bigQueryTokenResponse
err = json.Unmarshal(contents, &tokenResponse)
if err != nil {
return nil, err
}
return &tokenResponse, nil
}