go/adbc/validation/validation.go (992 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // Package validation is a driver-agnostic test suite intended to aid in // driver development for ADBC drivers. It provides a series of utilities // and defined tests that can be used to validate a driver follows the // correct and expected behavior. package validation import ( "context" "io" "strings" "testing" "github.com/apache/arrow-adbc/go/adbc" "github.com/apache/arrow-adbc/go/adbc/utils" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) type DriverQuirks interface { // Called in SetupTest to initialize anything needed for testing SetupDriver(*testing.T) adbc.Driver // Called in TearDownTest to clean up anything necessary in between tests TearDownDriver(*testing.T, adbc.Driver) // Return the list of key/value pairs of options to pass when // calling NewDatabase DatabaseOptions() map[string]string // Return the SQL to reference the bind parameter for a given index BindParameter(index int) string // Whether the driver supports bulk ingest SupportsBulkIngest(mode string) bool // Whether two statements can be used at the same time on a single connection SupportsConcurrentStatements() bool // Whether current catalog/schema are supported SupportsCurrentCatalogSchema() bool // Whether GetSetOptions is supported SupportsGetSetOptions() bool // Whether AdbcStatementExecuteSchema should work SupportsExecuteSchema() bool // Whether AdbcStatementExecutePartitions should work SupportsPartitionedData() bool // Whether statistics are supported SupportsStatistics() bool // Whether transactions are supported (Commit/Rollback on connection) SupportsTransactions() bool // Whether retrieving the schema of prepared statement params is supported SupportsGetParameterSchema() bool // Whether it supports dynamic parameter binding in queries SupportsDynamicParameterBinding() bool // Whether it returns an error when attempting to ingest with an incompatible schema SupportsErrorIngestIncompatibleSchema() bool // Expected Metadata responses GetMetadata(adbc.InfoCode) interface{} // Create a sample table from an arrow record CreateSampleTable(tableName string, r arrow.Record) error // Field Metadata for Sample Table for comparison SampleTableSchemaMetadata(tblName string, dt arrow.DataType) arrow.Metadata // have the driver drop a table with the correct SQL syntax DropTable(adbc.Connection, string) error Catalog() string DBSchema() string Alloc() memory.Allocator } type DatabaseTests struct { suite.Suite Driver adbc.Driver Quirks DriverQuirks } func (d *DatabaseTests) SetupTest() { d.Driver = d.Quirks.SetupDriver(d.T()) } func (d *DatabaseTests) TearDownTest() { d.Quirks.TearDownDriver(d.T(), d.Driver) d.Driver = nil } func (d *DatabaseTests) TestNewDatabase() { db, err := d.Driver.NewDatabase(d.Quirks.DatabaseOptions()) d.NoError(err) d.NotNil(db) d.Implements((*adbc.Database)(nil), db) d.NoError(db.Close()) } type ConnectionTests struct { suite.Suite Driver adbc.Driver Quirks DriverQuirks DB adbc.Database } func (c *ConnectionTests) SetupTest() { c.Driver = c.Quirks.SetupDriver(c.T()) var err error c.DB, err = c.Driver.NewDatabase(c.Quirks.DatabaseOptions()) c.Require().NoError(err) } func (c *ConnectionTests) TearDownTest() { c.Quirks.TearDownDriver(c.T(), c.Driver) c.Driver = nil c.NoError(c.DB.Close()) c.DB = nil } func (c *ConnectionTests) TestGetSetOptions() { cnxn, err := c.DB.Open(context.Background()) c.NoError(err) c.NotNil(cnxn) stmt, err := cnxn.NewStatement() c.NoError(err) c.NotNil(stmt) expected := c.Quirks.SupportsGetSetOptions() _, ok := c.DB.(adbc.GetSetOptions) c.Equal(expected, ok) _, ok = cnxn.(adbc.GetSetOptions) c.Equal(expected, ok) _, ok = stmt.(adbc.GetSetOptions) c.Equal(expected, ok) c.NoError(stmt.Close()) c.NoError(cnxn.Close()) } func (c *ConnectionTests) TestNewConn() { cnxn, err := c.DB.Open(context.Background()) c.NoError(err) c.NotNil(cnxn) c.NoError(cnxn.Close()) } func (c *ConnectionTests) TestCloseConnTwice() { cnxn, err := c.DB.Open(context.Background()) c.NoError(err) c.NotNil(cnxn) c.NoError(cnxn.Close()) err = cnxn.Close() var adbcError adbc.Error c.ErrorAs(err, &adbcError) c.Equal(adbc.StatusInvalidState, adbcError.Code) } func (c *ConnectionTests) TestConcurrent() { cnxn, _ := c.DB.Open(context.Background()) cnxn2, err := c.DB.Open(context.Background()) c.Require().NoError(err) c.NoError(cnxn.Close()) c.NoError(cnxn2.Close()) } func (c *ConnectionTests) TestAutocommitDefault() { ctx := context.Background() // even if not supported, drivers should act as if autocommit // is enabled, and return INVALID_STATE if the client tries to // commit or rollback cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) if getset, ok := cnxn.(adbc.GetSetOptions); ok { value, err := getset.GetOption(adbc.OptionKeyAutoCommit) c.NoError(err) c.Equal(adbc.OptionValueEnabled, value) } expectedCode := adbc.StatusInvalidState var adbcError adbc.Error err := cnxn.Commit(ctx) c.ErrorAs(err, &adbcError) c.Equal(expectedCode, adbcError.Code) err = cnxn.Rollback(ctx) c.ErrorAs(err, &adbcError) c.Equal(expectedCode, adbcError.Code) // if the driver supports setting options after init, it should error // on an invalid option value for autocommit if cnxnopts, ok := cnxn.(adbc.PostInitOptions); ok { c.Error(cnxnopts.SetOption(adbc.OptionKeyAutoCommit, "invalid")) } } func (c *ConnectionTests) TestAutocommitToggle() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) if !c.Quirks.SupportsTransactions() { return } // if the connection doesn't support setting options after init // then there's nothing to test here cnxnopt, ok := cnxn.(adbc.PostInitOptions) if !ok { return } // it is ok to enable autocommit when it is already enabled c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueEnabled)) c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueDisabled)) if getset, ok := cnxn.(adbc.GetSetOptions); ok { value, err := getset.GetOption(adbc.OptionKeyAutoCommit) c.NoError(err) c.Equal(adbc.OptionValueDisabled, value) } // it is ok to disable autocommit when it isn't enabled c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueDisabled)) if getset, ok := cnxn.(adbc.GetSetOptions); ok { value, err := getset.GetOption(adbc.OptionKeyAutoCommit) c.NoError(err) c.Equal(adbc.OptionValueDisabled, value) } } func (c *ConnectionTests) TestMetadataCurrentCatalog() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) getset, ok := cnxn.(adbc.GetSetOptions) if !c.Quirks.SupportsGetSetOptions() { c.False(ok) return } c.True(ok) value, err := getset.GetOption(adbc.OptionKeyCurrentCatalog) if c.Quirks.SupportsCurrentCatalogSchema() { c.NoError(err) c.Equal(c.Quirks.Catalog(), value) } else { c.Error(err) } } func (c *ConnectionTests) TestMetadataCurrentDbSchema() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) getset, ok := cnxn.(adbc.GetSetOptions) if !c.Quirks.SupportsGetSetOptions() { c.False(ok) return } c.True(ok) value, err := getset.GetOption(adbc.OptionKeyCurrentDbSchema) if c.Quirks.SupportsCurrentCatalogSchema() { c.NoError(err) c.Equal(c.Quirks.DBSchema(), value) } else { c.Error(err) } } func (c *ConnectionTests) TestMetadataGetInfo() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) info := []adbc.InfoCode{ adbc.InfoDriverName, adbc.InfoDriverVersion, adbc.InfoDriverArrowVersion, adbc.InfoDriverADBCVersion, adbc.InfoVendorName, adbc.InfoVendorVersion, adbc.InfoVendorArrowVersion, } rdr, err := cnxn.GetInfo(ctx, info) c.Require().NoError(err) defer rdr.Release() c.Truef(adbc.GetInfoSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s", adbc.GetInfoSchema, rdr.Schema()) for rdr.Next() { rec := rdr.Record() codeCol := rec.Column(0).(*array.Uint32) valUnion := rec.Column(1).(*array.DenseUnion) for i := 0; i < int(rec.NumRows()); i++ { code := codeCol.Value(i) child := valUnion.Field(valUnion.ChildID(i)) offset := int(valUnion.ValueOffset(i)) valUnion.GetOneForMarshal(i) if child.IsNull(offset) { exp := c.Quirks.GetMetadata(adbc.InfoCode(code)) c.Nilf(exp, "got nil for info %s, expected: %s", adbc.InfoCode(code), exp) } else { expected := c.Quirks.GetMetadata(adbc.InfoCode(code)) var actual interface{} switch valUnion.ChildID(i) { case 0: // String actual = child.(*array.String).Value(offset) case 1: // bool actual = child.(*array.Boolean).Value(offset) case 2: // int64 actual = child.(*array.Int64).Value(offset) default: c.FailNow("Unknown union type code", valUnion.ChildID(i)) } c.Equal(expected, actual, adbc.InfoCode(code).String()) } } } } func (c *ConnectionTests) TestMetadataGetStatistics() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) if c.Quirks.SupportsStatistics() { stats, ok := cnxn.(adbc.ConnectionGetStatistics) c.True(ok) reader, err := stats.GetStatistics(ctx, nil, nil, nil, true) c.NoError(err) defer reader.Release() } else { stats, ok := cnxn.(adbc.ConnectionGetStatistics) if ok { _, err := stats.GetStatistics(ctx, nil, nil, nil, true) var adbcErr adbc.Error c.ErrorAs(err, &adbcErr) c.Equal(adbc.StatusNotImplemented, adbcErr.Code) } } } func (c *ConnectionTests) TestMetadataGetTableSchema() { rec, _, err := array.RecordFromJSON(c.Quirks.Alloc(), arrow.NewSchema( []arrow.Field{ {Name: "ints", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, {Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true}, }, nil), strings.NewReader(`[ {"ints": 42, "strings": "foo"}, {"ints": -42, "strings": null}, {"ints": null, "strings": ""} ]`)) c.Require().NoError(err) defer rec.Release() ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) c.Require().NoError(c.Quirks.CreateSampleTable("sample_test", rec)) sc, err := cnxn.GetTableSchema(ctx, nil, nil, "sample_test") c.Require().NoError(err) expectedSchema := arrow.NewSchema([]arrow.Field{ {Name: "ints", Type: arrow.PrimitiveTypes.Int64, Nullable: true, Metadata: c.Quirks.SampleTableSchemaMetadata("sample_test", arrow.PrimitiveTypes.Int64)}, {Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true, Metadata: c.Quirks.SampleTableSchemaMetadata("sample_test", arrow.BinaryTypes.String)}, }, nil) c.Truef(expectedSchema.Equal(sc), "expected: %s\ngot: %s", expectedSchema, sc) } func (c *ConnectionTests) TestMetadataGetTableTypes() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) rdr, err := cnxn.GetTableTypes(ctx) c.Require().NoError(err) defer rdr.Release() c.Truef(adbc.TableTypesSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s", adbc.TableTypesSchema, rdr.Schema()) c.True(rdr.Next()) } func (c *ConnectionTests) TestMetadataGetObjectsColumns() { ctx := context.Background() cnxn, _ := c.DB.Open(ctx) defer CheckedClose(c.T(), cnxn) ingestCatalogName := c.Quirks.Catalog() ingestSchemaName := c.Quirks.DBSchema() ingestTableName := "bulk_ingest" c.Require().NoError(c.Quirks.DropTable(cnxn, ingestTableName)) rec, _, err := array.RecordFromJSON(c.Quirks.Alloc(), arrow.NewSchema( []arrow.Field{ {Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, {Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true}, }, nil), strings.NewReader(`[ {"int64s": 42, "strings": "foo"}, {"int64s": -42, "strings": null}, {"int64s": null, "strings": ""} ]`)) c.Require().NoError(err) defer rec.Release() c.Require().NoError(c.Quirks.CreateSampleTable(ingestTableName, rec)) catalogFilterInvalid := ingestCatalogName + "_invalid" dbSchemaFilterInvalid := ingestSchemaName + "_invalid" tableFilterInvalid := ingestTableName + "_invalid" columnFilter := "in%" tests := []struct { name string depth adbc.ObjectDepth catalogFilter *string dbSchemaFilter *string tableFilter *string columnFilter *string tableTypes []string expectFindCatalog bool expectFindDbSchema bool expectFindTable bool expectedColnames []string expectedPositions []int32 }{ { name: "depth catalog no filter", depth: adbc.ObjectDepthCatalogs, expectFindCatalog: true, }, { name: "depth dbSchema no filter", depth: adbc.ObjectDepthDBSchemas, expectFindCatalog: true, expectFindDbSchema: true, }, { name: "depth table no filter", depth: adbc.ObjectDepthTables, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, }, { name: "depth column no filter", depth: adbc.ObjectDepthColumns, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, expectedColnames: []string{"int64s", "strings"}, expectedPositions: []int32{1, 2}, }, { name: "filter catalog valid", depth: adbc.ObjectDepthColumns, catalogFilter: &ingestCatalogName, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, expectedColnames: []string{"int64s", "strings"}, expectedPositions: []int32{1, 2}, }, { name: "filter catalog invalid", depth: adbc.ObjectDepthColumns, catalogFilter: &catalogFilterInvalid, }, { name: "filter dbSchema valid", depth: adbc.ObjectDepthColumns, dbSchemaFilter: &ingestSchemaName, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, expectedColnames: []string{"int64s", "strings"}, expectedPositions: []int32{1, 2}, }, { name: "filter dbSchema invalid", depth: adbc.ObjectDepthColumns, dbSchemaFilter: &dbSchemaFilterInvalid, expectFindCatalog: true, }, { name: "filter table valid", depth: adbc.ObjectDepthColumns, tableFilter: &ingestTableName, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, expectedColnames: []string{"int64s", "strings"}, expectedPositions: []int32{1, 2}, }, { name: "filter table invalid", depth: adbc.ObjectDepthColumns, tableFilter: &tableFilterInvalid, expectFindCatalog: true, expectFindDbSchema: true, }, { name: "filter column: in%", depth: adbc.ObjectDepthColumns, columnFilter: &columnFilter, expectFindCatalog: true, expectFindDbSchema: true, expectFindTable: true, expectedColnames: []string{"int64s"}, expectedPositions: []int32{1}, }, } for _, tt := range tests { c.Run(tt.name, func() { rdr, err := cnxn.GetObjects(ctx, tt.depth, tt.catalogFilter, tt.dbSchemaFilter, tt.tableFilter, tt.columnFilter, tt.tableTypes) c.Require().NoError(err) defer rdr.Release() c.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema()) c.True(rdr.Next()) rec := rdr.Record() var ( foundCatalog = false foundDbSchema = false foundTable = false catalogs = rec.Column(0).(*array.String) catalogDbSchemasList = rec.Column(1).(*array.List) catalogDbSchemas = catalogDbSchemasList.ListValues().(*array.Struct) dbSchemaNames = catalogDbSchemas.Field(0).(*array.String) dbSchemaTablesList = catalogDbSchemas.Field(1).(*array.List) dbSchemaTables = dbSchemaTablesList.ListValues().(*array.Struct) tableColumnsList = dbSchemaTables.Field(2).(*array.List) tableColumns = tableColumnsList.ListValues().(*array.Struct) colnames = make([]string, 0) positions = make([]int32, 0) ) for row := 0; row < int(rec.NumRows()); row++ { catalogName := catalogs.Value(row) if strings.EqualFold(catalogName, ingestCatalogName) { foundCatalog = true dbSchemaIdxStart, dbSchemaIdxEnd := catalogDbSchemasList.ValueOffsets(row) for dbSchemaIdx := dbSchemaIdxStart; dbSchemaIdx < dbSchemaIdxEnd; dbSchemaIdx++ { schemaName := dbSchemaNames.Value(int(dbSchemaIdx)) if strings.EqualFold(schemaName, ingestSchemaName) { foundDbSchema = true tblIdxStart, tblIdxEnd := dbSchemaTablesList.ValueOffsets(int(dbSchemaIdx)) for tblIdx := tblIdxStart; tblIdx < tblIdxEnd; tblIdx++ { tableName := dbSchemaTables.Field(0).(*array.String).Value(int(tblIdx)) if strings.EqualFold(tableName, ingestTableName) { foundTable = true colIdxStart, colIdxEnd := tableColumnsList.ValueOffsets(int(tblIdx)) for colIdx := colIdxStart; colIdx < colIdxEnd; colIdx++ { name := tableColumns.Field(0).(*array.String).Value(int(colIdx)) colnames = append(colnames, strings.ToLower(name)) positions = append(positions, tableColumns.Field(1).(*array.Int32).Value(int(colIdx))) } } } } } } } c.False(rdr.Next()) c.Equal(tt.expectFindCatalog, foundCatalog) c.Equal(tt.expectFindDbSchema, foundDbSchema) c.Equal(tt.expectFindTable, foundTable) c.ElementsMatch(tt.expectedColnames, colnames) c.ElementsMatch(tt.expectedPositions, positions) }) } } type StatementTests struct { suite.Suite Driver adbc.Driver Quirks DriverQuirks DB adbc.Database Cnxn adbc.Connection ctx context.Context } func (s *StatementTests) SetupTest() { s.Driver = s.Quirks.SetupDriver(s.T()) var err error s.DB, err = s.Driver.NewDatabase(s.Quirks.DatabaseOptions()) s.Require().NoError(err) s.ctx = context.Background() s.Cnxn, err = s.DB.Open(s.ctx) s.Require().NoError(err) } func (s *StatementTests) TearDownTest() { s.Require().NoError(s.Cnxn.Close()) s.Quirks.TearDownDriver(s.T(), s.Driver) s.Cnxn = nil s.NoError(s.DB.Close()) s.DB = nil s.Driver = nil } func (s *StatementTests) TestNewStatement() { stmt, err := s.Cnxn.NewStatement() s.NoError(err) s.NotNil(stmt) s.NoError(stmt.Close()) var adbcError adbc.Error // statement already closed s.ErrorAs(stmt.Close(), &adbcError) s.Equal(adbc.StatusInvalidState, adbcError.Code) stmt, err = s.Cnxn.NewStatement() s.NoError(err) // cannot execute without a query _, _, err = stmt.ExecuteQuery(s.ctx) s.ErrorAs(err, &adbcError) s.Equal(adbc.StatusInvalidState, adbcError.Code) } func (s *StatementTests) TestSqlExecuteSchema() { if !s.Quirks.SupportsExecuteSchema() { s.T().SkipNow() } stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) es, ok := stmt.(adbc.StatementExecuteSchema) s.Require().True(ok, "%#v does not support ExecuteSchema", es) s.Run("no query", func() { var adbcErr adbc.Error schema, err := es.ExecuteSchema(s.ctx) s.ErrorAs(err, &adbcErr) s.Equal(adbc.StatusInvalidState, adbcErr.Code) s.Nil(schema) }) s.Run("query", func() { s.NoError(stmt.SetSqlQuery("SELECT 1, 'string'")) schema, err := es.ExecuteSchema(s.ctx) s.NoError(err) s.Equal(2, len(schema.Fields())) s.True(schema.Field(0).Type.ID() == arrow.INT32 || schema.Field(0).Type.ID() == arrow.INT64) s.Equal(arrow.STRING, schema.Field(1).Type.ID()) }) s.Run("prepared", func() { s.NoError(stmt.SetSqlQuery("SELECT 1, 'string'")) s.NoError(stmt.Prepare(s.ctx)) schema, err := es.ExecuteSchema(s.ctx) s.NoError(err) s.Equal(2, len(schema.Fields())) s.True(schema.Field(0).Type.ID() == arrow.INT32 || schema.Field(0).Type.ID() == arrow.INT64) s.Equal(arrow.STRING, schema.Field(1).Type.ID()) }) } func (s *StatementTests) TestSqlPartitionedInts() { stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.NoError(stmt.SetSqlQuery("SELECT 42")) var adbcError adbc.Error if !s.Quirks.SupportsPartitionedData() { _, _, _, err := stmt.ExecutePartitions(s.ctx) s.ErrorAs(err, &adbcError) s.Equal(adbc.StatusNotImplemented, adbcError.Code) return } sc, part, rows, err := stmt.ExecutePartitions(s.ctx) s.Require().NoError(err) s.EqualValues(1, part.NumPartitions) s.Len(part.PartitionIDs, 1) s.True(rows == 1 || rows == -1, rows) if sc != nil { s.Len(sc.Fields(), 1) } cxn, err := s.DB.Open(s.ctx) s.Require().NoError(err) defer CheckedClose(s.T(), cxn) rdr, err := cxn.ReadPartition(s.ctx, part.PartitionIDs[0]) s.Require().NoError(err) defer rdr.Release() sc = rdr.Schema() s.Require().NotNil(sc) s.Len(sc.Fields(), 1) s.True(rdr.Next()) rec := rdr.Record() s.EqualValues(1, rec.NumCols()) s.EqualValues(1, rec.NumRows()) switch arr := rec.Column(0).(type) { case *array.Int32: s.EqualValues(42, arr.Value(0)) case *array.Int64: s.EqualValues(42, arr.Value(0)) } s.False(rdr.Next()) } func (s *StatementTests) TestSQLPrepareGetParameterSchema() { stmt, err := s.Cnxn.NewStatement() s.NoError(err) defer CheckedClose(s.T(), stmt) query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1) s.NoError(stmt.SetSqlQuery(query)) s.NoError(stmt.Prepare(s.ctx)) sc, err := stmt.GetParameterSchema() if !s.Quirks.SupportsGetParameterSchema() { var adbcError adbc.Error s.ErrorAs(err, &adbcError) s.Equal(adbc.StatusNotImplemented, adbcError.Code) return } s.NoError(err) // it's allowed to be nil as some systems don't provide param schemas if sc != nil { s.Len(sc.Fields(), 2) } } func (s *StatementTests) TestSQLPrepareSelectParams() { if !s.Quirks.SupportsDynamicParameterBinding() { s.T().SkipNow() } stmt, err := s.Cnxn.NewStatement() s.NoError(err) defer CheckedClose(s.T(), stmt) query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1) s.Require().NoError(stmt.SetSqlQuery(query)) s.Require().NoError(stmt.Prepare(s.ctx)) schema := arrow.NewSchema([]arrow.Field{ {Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, {Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true}, }, nil) bldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer bldr.Release() bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{42, -42, 0}, []bool{true, true, false}) bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"", "", "bar"}, []bool{true, false, true}) batch := bldr.NewRecord() defer batch.Release() s.Require().NoError(stmt.Bind(s.ctx, batch)) rdr, affected, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) defer rdr.Release() s.True(affected == 1 || affected == -1, affected) var nrows int64 for rdr.Next() { rec := rdr.Record() s.Require().NotNil(rec) s.EqualValues(2, rec.NumCols()) start, end := nrows, nrows+rec.NumRows() switch arr := rec.Column(0).(type) { case *array.Int32: case *array.Int64: s.True(array.SliceEqual(arr, 0, int64(arr.Len()), batch.Column(0), start, end)) } s.True(array.SliceEqual(rec.Column(1), 0, rec.NumRows(), batch.Column(1), start, end)) nrows += rec.NumRows() } s.EqualValues(3, nrows) s.False(rdr.Next()) s.NoError(rdr.Err()) } func (s *StatementTests) TestSQLPrepareSelectNoParams() { stmt, err := s.Cnxn.NewStatement() s.NoError(err) defer CheckedClose(s.T(), stmt) s.NoError(stmt.SetSqlQuery("SELECT 1")) s.NoError(stmt.Prepare(s.ctx)) rdr, n, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) s.True(n == 1 || n == -1) defer rdr.Release() sc := rdr.Schema() s.Require().NotNil(sc) s.Len(sc.Fields(), 1) s.True(rdr.Next()) rec := rdr.Record() s.EqualValues(1, rec.NumCols()) s.EqualValues(1, rec.NumRows()) switch arr := rec.Column(0).(type) { case *array.Int32: s.EqualValues(1, arr.Value(0)) case *array.Int64: s.EqualValues(1, arr.Value(0)) } s.False(rdr.Next()) } func (s *StatementTests) TestSqlPrepareErrorParamCountMismatch() { if !s.Quirks.SupportsDynamicParameterBinding() { s.T().SkipNow() } query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1) stmt, err := s.Cnxn.NewStatement() s.NoError(err) defer CheckedClose(s.T(), stmt) s.NoError(stmt.SetSqlQuery(query)) s.NoError(stmt.Prepare(s.ctx)) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), arrow.NewSchema( []arrow.Field{{Name: "int64s", Type: arrow.PrimitiveTypes.Int64}}, nil)) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false}) batch := batchbldr.NewRecord() defer batch.Release() s.NoError(stmt.Bind(s.ctx, batch)) _, _, err = stmt.ExecuteQuery(s.ctx) s.Error(err) } func (s *StatementTests) TestSqlIngestInts() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeCreate) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false}) batch := batchbldr.NewRecord() defer batch.Release() stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.Bind(s.ctx, batch)) affected, err := stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 3 { s.FailNowf("invalid number of affected rows", "should be -1 or 3, got: %d", affected) } // use order by clause to ensure we get the same order as the input batch s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM "bulk_ingest" ORDER BY "int64s" DESC NULLS LAST`)) rdr, rows, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) if rows != -1 && rows != 3 { s.FailNowf("invalid number of returned rows", "should be -1 or 3, got: %d", rows) } defer rdr.Release() s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema()) s.Require().True(rdr.Next()) rec := rdr.Record() s.EqualValues(3, rec.NumRows()) s.EqualValues(1, rec.NumCols()) s.Truef(array.Equal(rec.Column(0), batch.Column(0)), "expected: %s\ngot: %s", batch.Column(0), rec.Column(0)) s.Require().False(rdr.Next()) s.Require().NoError(rdr.Err()) } func (s *StatementTests) TestSqlIngestAppend() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeAppend) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42}, []bool{true}) batch := batchbldr.NewRecord() defer batch.Release() // ingest and create table stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.Bind(s.ctx, batch)) affected, err := stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 1 { s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected) } // now append bldr.AppendValues([]int64{-42, 0}, []bool{true, false}) batch2 := batchbldr.NewRecord() defer batch2.Release() s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeAppend) { s.T().SkipNow() } s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend)) s.Require().NoError(stmt.Bind(s.ctx, batch2)) affected, err = stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 2 { s.FailNowf("invalid number of affected rows", "should be -1 or 2, got: %d", affected) } // use order by clause to ensure we get the same order as the input batch s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM "bulk_ingest" ORDER BY "int64s" DESC NULLS LAST`)) rdr, rows, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) if rows != -1 && rows != 3 { s.FailNowf("invalid number of returned rows", "should be -1 or 3, got: %d", rows) } defer rdr.Release() s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema()) s.Require().True(rdr.Next()) rec := rdr.Record() s.EqualValues(3, rec.NumRows()) s.EqualValues(1, rec.NumCols()) exp, err := array.Concatenate([]arrow.Array{batch.Column(0), batch2.Column(0)}, s.Quirks.Alloc()) s.Require().NoError(err) defer exp.Release() s.Truef(array.Equal(rec.Column(0), exp), "expected: %s\ngot: %s", exp, rec.Column(0)) s.Require().False(rdr.Next()) s.Require().NoError(rdr.Err()) } func (s *StatementTests) TestSqlIngestReplace() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeReplace) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42}, []bool{true}) batch := batchbldr.NewRecord() defer batch.Release() // ingest and create table stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.Bind(s.ctx, batch)) affected, err := stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 1 { s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected) } // now replace schema = arrow.NewSchema([]arrow.Field{{ Name: "newintcol", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr2 := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr2.Release() bldr2 := batchbldr2.Field(0).(*array.Int64Builder) bldr2.AppendValues([]int64{42}, []bool{true}) batch2 := batchbldr2.NewRecord() defer batch2.Release() s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeReplace)) s.Require().NoError(stmt.Bind(s.ctx, batch2)) affected, err = stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 1 { s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected) } s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM "bulk_ingest"`)) rdr, rows, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) if rows != -1 && rows != 1 { s.FailNowf("invalid number of returned rows", "should be -1 or 1, got: %d", rows) } defer rdr.Release() s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema()) s.Require().True(rdr.Next()) rec := rdr.Record() s.EqualValues(1, rec.NumRows()) s.EqualValues(1, rec.NumCols()) col, ok := rec.Column(0).(*array.Int64) s.True(ok) s.Equal(int64(42), col.Value(0)) s.Require().False(rdr.Next()) s.Require().NoError(rdr.Err()) } func (s *StatementTests) TestSqlIngestCreateAppend() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeCreateAppend) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42}, []bool{true}) batch := batchbldr.NewRecord() defer batch.Release() // ingest and create table stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreateAppend)) s.Require().NoError(stmt.Bind(s.ctx, batch)) affected, err := stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 1 { s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected) } // append s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreateAppend)) s.Require().NoError(stmt.Bind(s.ctx, batch)) affected, err = stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) if affected != -1 && affected != 1 { s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected) } // validate s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM "bulk_ingest"`)) rdr, rows, err := stmt.ExecuteQuery(s.ctx) s.Require().NoError(err) if rows != -1 && rows != 2 { s.FailNowf("invalid number of returned rows", "should be -1 or 2, got: %d", rows) } defer rdr.Release() s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema()) s.Require().True(rdr.Next()) rec := rdr.Record() s.EqualValues(2, rec.NumRows()) s.EqualValues(1, rec.NumCols()) col, ok := rec.Column(0).(*array.Int64) s.True(ok) s.Equal(int64(42), col.Value(0)) s.Equal(int64(42), col.Value(1)) s.Require().False(rdr.Next()) s.Require().NoError(rdr.Err()) } func (s *StatementTests) TestSqlIngestErrors() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeCreate) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) stmt, err := s.Cnxn.NewStatement() s.Require().NoError(err) defer CheckedClose(s.T(), stmt) s.Run("ingest without bind", func() { var e adbc.Error s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) _, _, err := stmt.ExecuteQuery(s.ctx) s.ErrorAs(err, &e) s.Equal(adbc.StatusInvalidState, e.Code) }) s.Run("append to nonexistent table", func() { if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeAppend) { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false}) batch := batchbldr.NewRecord() defer batch.Release() s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend)) s.Require().NoError(stmt.Bind(s.ctx, batch)) var e adbc.Error _, _, err := stmt.ExecuteQuery(s.ctx) s.ErrorAs(err, &e) s.NotEqual(adbc.StatusOK, e.Code) // SQLSTATE 42S02 == table or view not found s.Equal([5]byte{'4', '2', 'S', '0', '2'}, e.SqlState) }) s.Run("overwrite and incompatible schema", func() { if !s.Quirks.SupportsErrorIngestIncompatibleSchema() { s.T().SkipNow() } s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest")) schema := arrow.NewSchema([]arrow.Field{{ Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil) batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() bldr := batchbldr.Field(0).(*array.Int64Builder) bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false}) batch := batchbldr.NewRecord() defer batch.Release() s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate)) s.Require().NoError(stmt.Bind(s.ctx, batch)) // create it _, err := stmt.ExecuteUpdate(s.ctx) s.Require().NoError(err) // error if we try to create again s.Require().NoError(stmt.Bind(s.ctx, batch)) var e adbc.Error _, err = stmt.ExecuteUpdate(s.ctx) s.ErrorAs(err, &e) s.Equal(adbc.StatusInternal, e.Code) // try to append an incompatible schema schema, _ = schema.AddField(1, arrow.Field{Name: "coltwo", Type: arrow.PrimitiveTypes.Int64, Nullable: true}) batchbldr = array.NewRecordBuilder(s.Quirks.Alloc(), schema) defer batchbldr.Release() batchbldr.Field(0).AppendNull() batchbldr.Field(1).AppendNull() batch = batchbldr.NewRecord() defer batch.Release() if !s.Quirks.SupportsBulkIngest(adbc.OptionValueIngestModeCreate) { s.T().SkipNow() } s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest")) s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend)) s.Require().NoError(stmt.Bind(s.ctx, batch)) _, err = stmt.ExecuteUpdate(s.ctx) s.ErrorAs(err, &e) s.NotEqual(adbc.StatusOK, e.Code) }) } // CheckedClose is a helper for deferring Close() with a potential error in a test. // For example: `defer CheckedClose(suite.T(), stmt)` func CheckedClose(t *testing.T, c io.Closer) { assert.NoError(t, c.Close()) } // CheckedClose is a helper for deferring a function with a potential error in a test. // For example: `defer CheckedCleanup(suite.T(), func() error { return os.Remove(path) })` func CheckedCleanup(t *testing.T, c func() error) { assert.NoError(t, c()) }