go/adbc/validation/validation.go (609 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Package validation is a driver-agnostic test suite intended to aid in
// driver development for ADBC drivers. It provides a series of utilities
// and defined tests that can be used to validate a driver follows the
// correct and expected behavior.
package validation
import (
"context"
"strings"
"testing"
"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow-adbc/go/adbc/utils"
"github.com/apache/arrow/go/v13/arrow"
"github.com/apache/arrow/go/v13/arrow/array"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/stretchr/testify/suite"
)
type DriverQuirks interface {
// Called in SetupTest to initialize anything needed for testing
SetupDriver(*testing.T) adbc.Driver
// Called in TearDownTest to clean up anything necessary in between tests
TearDownDriver(*testing.T, adbc.Driver)
// Return the list of key/value pairs of options to pass when
// calling NewDatabase
DatabaseOptions() map[string]string
// Return the SQL to reference the bind parameter for a given index
BindParameter(index int) string
// Whether two statements can be used at the same time on a single connection
SupportsConcurrentStatements() bool
// Whether AdbcStatementExecutePartitions should work
SupportsPartitionedData() bool
// Whether transactions are supported (Commit/Rollback on connection)
SupportsTransactions() bool
// Whether retrieving the schema of prepared statement params is supported
SupportsGetParameterSchema() bool
// Whether it supports dynamic parameter binding in queries
SupportsDynamicParameterBinding() bool
// Expected Metadata responses
GetMetadata(adbc.InfoCode) interface{}
// Create a sample table from an arrow record
CreateSampleTable(tableName string, r arrow.Record) error
// Field Metadata for Sample Table for comparison
SampleTableSchemaMetadata(tblName string, dt arrow.DataType) arrow.Metadata
// Whether the driver supports bulk ingest
SupportsBulkIngest() bool
// have the driver drop a table with the correct SQL syntax
DropTable(adbc.Connection, string) error
DBSchema() string
Alloc() memory.Allocator
}
type DatabaseTests struct {
suite.Suite
Driver adbc.Driver
Quirks DriverQuirks
}
func (d *DatabaseTests) SetupTest() {
d.Driver = d.Quirks.SetupDriver(d.T())
}
func (d *DatabaseTests) TearDownTest() {
d.Quirks.TearDownDriver(d.T(), d.Driver)
d.Driver = nil
}
func (d *DatabaseTests) TestNewDatabase() {
db, err := d.Driver.NewDatabase(d.Quirks.DatabaseOptions())
d.NoError(err)
d.NotNil(db)
d.Implements((*adbc.Database)(nil), db)
}
type ConnectionTests struct {
suite.Suite
Driver adbc.Driver
Quirks DriverQuirks
DB adbc.Database
}
func (c *ConnectionTests) SetupTest() {
c.Driver = c.Quirks.SetupDriver(c.T())
var err error
c.DB, err = c.Driver.NewDatabase(c.Quirks.DatabaseOptions())
c.Require().NoError(err)
}
func (c *ConnectionTests) TearDownTest() {
c.Quirks.TearDownDriver(c.T(), c.Driver)
c.Driver = nil
c.DB = nil
}
func (c *ConnectionTests) TestNewConn() {
cnxn, err := c.DB.Open(context.Background())
c.NoError(err)
c.NotNil(cnxn)
c.NoError(cnxn.Close())
}
func (c *ConnectionTests) TestCloseConnTwice() {
cnxn, err := c.DB.Open(context.Background())
c.NoError(err)
c.NotNil(cnxn)
c.NoError(cnxn.Close())
err = cnxn.Close()
var adbcError adbc.Error
c.ErrorAs(err, &adbcError)
c.Equal(adbc.StatusInvalidState, adbcError.Code)
}
func (c *ConnectionTests) TestConcurrent() {
cnxn, _ := c.DB.Open(context.Background())
cnxn2, err := c.DB.Open(context.Background())
c.Require().NoError(err)
c.NoError(cnxn.Close())
c.NoError(cnxn2.Close())
}
func (c *ConnectionTests) TestAutocommitDefault() {
ctx := context.Background()
// even if not supported, drivers should act as if autocommit
// is enabled, and return INVALID_STATE if the client tries to
// commit or rollback
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
expectedCode := adbc.StatusInvalidState
var adbcError adbc.Error
err := cnxn.Commit(ctx)
c.ErrorAs(err, &adbcError)
c.Equal(expectedCode, adbcError.Code)
err = cnxn.Rollback(ctx)
c.ErrorAs(err, &adbcError)
c.Equal(expectedCode, adbcError.Code)
// if the driver supports setting options after init, it should error
// on an invalid option value for autocommit
if cnxnopts, ok := cnxn.(adbc.PostInitOptions); ok {
c.Error(cnxnopts.SetOption(adbc.OptionKeyAutoCommit, "invalid"))
}
}
func (c *ConnectionTests) TestAutocommitToggle() {
ctx := context.Background()
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
if !c.Quirks.SupportsTransactions() {
return
}
// if the connection doesn't support setting options after init
// then there's nothing to test here
cnxnopt, ok := cnxn.(adbc.PostInitOptions)
if !ok {
return
}
// it is ok to enable autocommit when it is already enabled
c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueEnabled))
c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueDisabled))
// it is ok to disable autocommit when it isn't enabled
c.NoError(cnxnopt.SetOption(adbc.OptionKeyAutoCommit, adbc.OptionValueDisabled))
}
func (c *ConnectionTests) TestMetadataGetInfo() {
ctx := context.Background()
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
info := []adbc.InfoCode{
adbc.InfoDriverName,
adbc.InfoDriverVersion,
adbc.InfoDriverArrowVersion,
adbc.InfoVendorName,
adbc.InfoVendorVersion,
adbc.InfoVendorArrowVersion,
}
rdr, err := cnxn.GetInfo(ctx, info)
c.Require().NoError(err)
defer rdr.Release()
c.Truef(adbc.GetInfoSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s",
adbc.GetInfoSchema, rdr.Schema())
for rdr.Next() {
rec := rdr.Record()
codeCol := rec.Column(0).(*array.Uint32)
valUnion := rec.Column(1).(*array.DenseUnion)
for i := 0; i < int(rec.NumRows()); i++ {
code := codeCol.Value(i)
child := valUnion.Field(valUnion.ChildID(i))
if child.IsNull(i) {
exp := c.Quirks.GetMetadata(adbc.InfoCode(code))
c.Nilf(exp, "got nil for info %s, expected: %s", adbc.InfoCode(code), exp)
} else {
// currently we only define utf8 values for metadata
c.Equal(c.Quirks.GetMetadata(adbc.InfoCode(code)), child.(*array.String).Value(i), adbc.InfoCode(code).String())
}
}
}
}
func (c *ConnectionTests) TestMetadataGetTableSchema() {
rec, _, err := array.RecordFromJSON(c.Quirks.Alloc(), arrow.NewSchema(
[]arrow.Field{
{Name: "ints", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
{Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true},
}, nil), strings.NewReader(`[
{"ints": 42, "strings": "foo"},
{"ints": -42, "strings": null},
{"ints": null, "strings": ""}
]`))
c.Require().NoError(err)
defer rec.Release()
ctx := context.Background()
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
c.Require().NoError(c.Quirks.CreateSampleTable("sample_test", rec))
sc, err := cnxn.GetTableSchema(ctx, nil, nil, "sample_test")
c.Require().NoError(err)
expectedSchema := arrow.NewSchema([]arrow.Field{
{Name: "ints", Type: arrow.PrimitiveTypes.Int64, Nullable: true,
Metadata: c.Quirks.SampleTableSchemaMetadata("sample_test", arrow.PrimitiveTypes.Int64)},
{Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true,
Metadata: c.Quirks.SampleTableSchemaMetadata("sample_test", arrow.BinaryTypes.String)},
}, nil)
c.Truef(expectedSchema.Equal(sc), "expected: %s\ngot: %s", expectedSchema, sc)
}
func (c *ConnectionTests) TestMetadataGetTableTypes() {
ctx := context.Background()
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
rdr, err := cnxn.GetTableTypes(ctx)
c.Require().NoError(err)
defer rdr.Release()
c.Truef(adbc.TableTypesSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s", adbc.TableTypesSchema, rdr.Schema())
c.True(rdr.Next())
}
func (c *ConnectionTests) TestMetadataGetObjectsColumns() {
ctx := context.Background()
cnxn, _ := c.DB.Open(ctx)
defer cnxn.Close()
c.Require().NoError(c.Quirks.DropTable(cnxn, "bulk_ingest"))
rec, _, err := array.RecordFromJSON(c.Quirks.Alloc(), arrow.NewSchema(
[]arrow.Field{
{Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
{Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true},
}, nil), strings.NewReader(`[
{"int64s": 42, "strings": "foo"},
{"int64s": -42, "strings": null},
{"int64s": null, "strings": ""}
]`))
c.Require().NoError(err)
defer rec.Release()
c.Require().NoError(c.Quirks.CreateSampleTable("bulk_ingest", rec))
filter := "in%"
tests := []struct {
name string
filter *string
colnames []string
positions []int32
}{
{"no filter", nil, []string{"int64s", "strings"}, []int32{1, 2}},
{"filter: in%", &filter, []string{"int64s"}, []int32{1}},
}
for _, tt := range tests {
c.Run(tt.name, func() {
rdr, err := cnxn.GetObjects(ctx, adbc.ObjectDepthColumns, nil, nil, nil, tt.filter, nil)
c.Require().NoError(err)
defer rdr.Release()
c.Truef(adbc.GetObjectsSchema.Equal(rdr.Schema()), "expected: %s\ngot: %s", adbc.GetObjectsSchema, rdr.Schema())
c.True(rdr.Next())
rec := rdr.Record()
c.Greater(rec.NumRows(), int64(0))
var (
foundExpected = false
catalogDbSchemasList = rec.Column(1).(*array.List)
catalogDbSchemas = catalogDbSchemasList.ListValues().(*array.Struct)
dbSchemaNames = catalogDbSchemas.Field(0).(*array.String)
dbSchemaTablesList = catalogDbSchemas.Field(1).(*array.List)
dbSchemaTables = dbSchemaTablesList.ListValues().(*array.Struct)
tableColumnsList = dbSchemaTables.Field(2).(*array.List)
tableColumns = tableColumnsList.ListValues().(*array.Struct)
colnames = make([]string, 0)
positions = make([]int32, 0)
)
for row := 0; row < int(rec.NumRows()); row++ {
dbSchemaIdxStart, dbSchemaIdxEnd := catalogDbSchemasList.ValueOffsets(row)
for dbSchemaIdx := dbSchemaIdxStart; dbSchemaIdx < dbSchemaIdxEnd; dbSchemaIdx++ {
schemaName := dbSchemaNames.Value(int(dbSchemaIdx))
tblIdxStart, tblIdxEnd := dbSchemaTablesList.ValueOffsets(int(dbSchemaIdx))
for tblIdx := tblIdxStart; tblIdx < tblIdxEnd; tblIdx++ {
tableName := dbSchemaTables.Field(0).(*array.String).Value(int(tblIdx))
if strings.EqualFold(schemaName, c.Quirks.DBSchema()) && strings.EqualFold("bulk_ingest", tableName) {
foundExpected = true
colIdxStart, colIdxEnd := tableColumnsList.ValueOffsets(int(tblIdx))
for colIdx := colIdxStart; colIdx < colIdxEnd; colIdx++ {
name := tableColumns.Field(0).(*array.String).Value(int(colIdx))
colnames = append(colnames, strings.ToLower(name))
positions = append(positions, tableColumns.Field(1).(*array.Int32).Value(int(colIdx)))
}
}
}
}
}
c.False(rdr.Next())
c.True(foundExpected)
c.Equal(tt.colnames, colnames)
c.Equal(tt.positions, positions)
})
}
}
type StatementTests struct {
suite.Suite
Driver adbc.Driver
Quirks DriverQuirks
DB adbc.Database
Cnxn adbc.Connection
ctx context.Context
}
func (s *StatementTests) SetupTest() {
s.Driver = s.Quirks.SetupDriver(s.T())
var err error
s.DB, err = s.Driver.NewDatabase(s.Quirks.DatabaseOptions())
s.Require().NoError(err)
s.ctx = context.Background()
s.Cnxn, err = s.DB.Open(s.ctx)
s.Require().NoError(err)
}
func (s *StatementTests) TearDownTest() {
s.Require().NoError(s.Cnxn.Close())
s.Quirks.TearDownDriver(s.T(), s.Driver)
s.Cnxn = nil
s.DB = nil
s.Driver = nil
}
func (s *StatementTests) TestNewStatement() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
s.NotNil(stmt)
s.NoError(stmt.Close())
var adbcError adbc.Error
s.ErrorAs(stmt.Close(), &adbcError)
s.Equal(adbc.StatusInvalidState, adbcError.Code)
stmt, err = s.Cnxn.NewStatement()
s.NoError(err)
_, _, err = stmt.ExecuteQuery(s.ctx)
s.ErrorAs(err, &adbcError)
s.Equal(adbc.StatusInvalidState, adbcError.Code)
}
func (s *StatementTests) TestSqlPartitionedInts() {
stmt, err := s.Cnxn.NewStatement()
s.Require().NoError(err)
defer stmt.Close()
s.NoError(stmt.SetSqlQuery("SELECT 42"))
var adbcError adbc.Error
if !s.Quirks.SupportsPartitionedData() {
_, _, _, err := stmt.ExecutePartitions(s.ctx)
s.ErrorAs(err, &adbcError)
s.Equal(adbc.StatusNotImplemented, adbcError.Code)
return
}
sc, part, rows, err := stmt.ExecutePartitions(s.ctx)
s.Require().NoError(err)
s.EqualValues(1, part.NumPartitions)
s.Len(part.PartitionIDs, 1)
s.True(rows == 1 || rows == -1, rows)
if sc != nil {
s.Len(sc.Fields(), 1)
}
cxn, err := s.DB.Open(s.ctx)
s.Require().NoError(err)
defer cxn.Close()
rdr, err := cxn.ReadPartition(s.ctx, part.PartitionIDs[0])
s.Require().NoError(err)
defer rdr.Release()
sc = rdr.Schema()
s.Require().NotNil(sc)
s.Len(sc.Fields(), 1)
s.True(rdr.Next())
rec := rdr.Record()
s.EqualValues(1, rec.NumCols())
s.EqualValues(1, rec.NumRows())
switch arr := rec.Column(0).(type) {
case *array.Int32:
s.EqualValues(42, arr.Value(0))
case *array.Int64:
s.EqualValues(42, arr.Value(0))
}
s.False(rdr.Next())
}
func (s *StatementTests) TestSQLPrepareGetParameterSchema() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()
query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1)
s.NoError(stmt.SetSqlQuery(query))
s.NoError(stmt.Prepare(s.ctx))
sc, err := stmt.GetParameterSchema()
if !s.Quirks.SupportsGetParameterSchema() {
var adbcError adbc.Error
s.ErrorAs(err, &adbcError)
s.Equal(adbc.StatusNotImplemented, adbcError.Code)
return
}
s.NoError(err)
// it's allowed to be nil as some systems don't provide param schemas
if sc != nil {
s.Len(sc.Fields(), 2)
}
}
func (s *StatementTests) TestSQLPrepareSelectParams() {
if !s.Quirks.SupportsDynamicParameterBinding() {
s.T().SkipNow()
}
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()
query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1)
s.Require().NoError(stmt.SetSqlQuery(query))
s.Require().NoError(stmt.Prepare(s.ctx))
schema := arrow.NewSchema([]arrow.Field{
{Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
{Name: "strings", Type: arrow.BinaryTypes.String, Nullable: true},
}, nil)
bldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer bldr.Release()
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
bldr.Field(1).(*array.StringBuilder).AppendValues([]string{"", "", "bar"}, []bool{true, false, true})
batch := bldr.NewRecord()
defer batch.Release()
s.Require().NoError(stmt.Bind(s.ctx, batch))
rdr, affected, err := stmt.ExecuteQuery(s.ctx)
s.Require().NoError(err)
defer rdr.Release()
s.True(affected == 1 || affected == -1, affected)
var nrows int64
for rdr.Next() {
rec := rdr.Record()
s.Require().NotNil(rec)
s.EqualValues(2, rec.NumCols())
start, end := nrows, nrows+rec.NumRows()
switch arr := rec.Column(0).(type) {
case *array.Int32:
case *array.Int64:
s.True(array.SliceEqual(arr, 0, int64(arr.Len()), batch.Column(0), start, end))
}
s.True(array.SliceEqual(rec.Column(1), 0, rec.NumRows(), batch.Column(1), start, end))
nrows += rec.NumRows()
}
s.EqualValues(3, nrows)
s.False(rdr.Next())
s.NoError(rdr.Err())
}
func (s *StatementTests) TestSQLPrepareSelectNoParams() {
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()
s.NoError(stmt.SetSqlQuery("SELECT 1"))
s.NoError(stmt.Prepare(s.ctx))
rdr, n, err := stmt.ExecuteQuery(s.ctx)
s.Require().NoError(err)
s.True(n == 1 || n == -1)
defer rdr.Release()
sc := rdr.Schema()
s.Require().NotNil(sc)
s.Len(sc.Fields(), 1)
s.True(rdr.Next())
rec := rdr.Record()
s.EqualValues(1, rec.NumCols())
s.EqualValues(1, rec.NumRows())
switch arr := rec.Column(0).(type) {
case *array.Int32:
s.EqualValues(1, arr.Value(0))
case *array.Int64:
s.EqualValues(1, arr.Value(0))
}
s.False(rdr.Next())
}
func (s *StatementTests) TestSqlPrepareErrorParamCountMismatch() {
if !s.Quirks.SupportsDynamicParameterBinding() {
s.T().SkipNow()
}
query := "SELECT " + s.Quirks.BindParameter(0) + ", " + s.Quirks.BindParameter(1)
stmt, err := s.Cnxn.NewStatement()
s.NoError(err)
defer stmt.Close()
s.NoError(stmt.SetSqlQuery(query))
s.NoError(stmt.Prepare(s.ctx))
batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), arrow.NewSchema(
[]arrow.Field{{Name: "int64s", Type: arrow.PrimitiveTypes.Int64}}, nil))
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
batch := batchbldr.NewRecord()
defer batch.Release()
s.NoError(stmt.Bind(s.ctx, batch))
_, _, err = stmt.ExecuteQuery(s.ctx)
s.Error(err)
}
func (s *StatementTests) TestSqlIngestInts() {
if !s.Quirks.SupportsBulkIngest() {
s.T().SkipNow()
}
s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest"))
schema := arrow.NewSchema([]arrow.Field{{
Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
batch := batchbldr.NewRecord()
defer batch.Release()
stmt, err := s.Cnxn.NewStatement()
s.Require().NoError(err)
defer stmt.Close()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.Bind(s.ctx, batch))
affected, err := stmt.ExecuteUpdate(s.ctx)
s.Require().NoError(err)
if affected != -1 && affected != 3 {
s.FailNowf("invalid number of affected rows", "should be -1 or 3, got: %d", affected)
}
// use order by clause to ensure we get the same order as the input batch
s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM bulk_ingest ORDER BY "int64s" DESC NULLS LAST`))
rdr, rows, err := stmt.ExecuteQuery(s.ctx)
s.Require().NoError(err)
if rows != -1 && rows != 3 {
s.FailNowf("invalid number of returned rows", "should be -1 or 3, got: %d", rows)
}
defer rdr.Release()
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
rec := rdr.Record()
s.EqualValues(3, rec.NumRows())
s.EqualValues(1, rec.NumCols())
s.Truef(array.Equal(rec.Column(0), batch.Column(0)), "expected: %s\ngot: %s", batch.Column(0), rec.Column(0))
s.Require().False(rdr.Next())
s.Require().NoError(rdr.Err())
}
func (s *StatementTests) TestSqlIngestAppend() {
if !s.Quirks.SupportsBulkIngest() {
s.T().SkipNow()
}
s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest"))
schema := arrow.NewSchema([]arrow.Field{{
Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42}, []bool{true})
batch := batchbldr.NewRecord()
defer batch.Release()
// ingest and create table
stmt, err := s.Cnxn.NewStatement()
s.Require().NoError(err)
defer stmt.Close()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.Bind(s.ctx, batch))
affected, err := stmt.ExecuteUpdate(s.ctx)
s.Require().NoError(err)
if affected != -1 && affected != 1 {
s.FailNowf("invalid number of affected rows", "should be -1 or 1, got: %d", affected)
}
// now append
bldr.AppendValues([]int64{-42, 0}, []bool{true, false})
batch2 := batchbldr.NewRecord()
defer batch2.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend))
s.Require().NoError(stmt.Bind(s.ctx, batch2))
affected, err = stmt.ExecuteUpdate(s.ctx)
s.Require().NoError(err)
if affected != -1 && affected != 2 {
s.FailNowf("invalid number of affected rows", "should be -1 or 2, got: %d", affected)
}
// use order by clause to ensure we get the same order as the input batch
s.Require().NoError(stmt.SetSqlQuery(`SELECT * FROM bulk_ingest ORDER BY "int64s" DESC NULLS LAST`))
rdr, rows, err := stmt.ExecuteQuery(s.ctx)
s.Require().NoError(err)
if rows != -1 && rows != 3 {
s.FailNowf("invalid number of returned rows", "should be -1 or 3, got: %d", rows)
}
defer rdr.Release()
s.Truef(schema.Equal(utils.RemoveSchemaMetadata(rdr.Schema())), "expected: %s\n got: %s", schema, rdr.Schema())
s.Require().True(rdr.Next())
rec := rdr.Record()
s.EqualValues(3, rec.NumRows())
s.EqualValues(1, rec.NumCols())
exp, err := array.Concatenate([]arrow.Array{batch.Column(0), batch2.Column(0)}, s.Quirks.Alloc())
s.Require().NoError(err)
defer exp.Release()
s.Truef(array.Equal(rec.Column(0), exp), "expected: %s\ngot: %s", exp, rec.Column(0))
s.Require().False(rdr.Next())
s.Require().NoError(rdr.Err())
}
func (s *StatementTests) TestSqlIngestErrors() {
if !s.Quirks.SupportsBulkIngest() {
s.T().SkipNow()
}
stmt, err := s.Cnxn.NewStatement()
s.Require().NoError(err)
defer stmt.Close()
s.Run("ingest without bind", func() {
var e adbc.Error
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
_, _, err := stmt.ExecuteQuery(s.ctx)
s.ErrorAs(err, &e)
s.Equal(adbc.StatusInvalidState, e.Code)
})
s.Run("append to nonexistent table", func() {
s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest"))
schema := arrow.NewSchema([]arrow.Field{{
Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
batch := batchbldr.NewRecord()
defer batch.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend))
s.Require().NoError(stmt.Bind(s.ctx, batch))
var e adbc.Error
_, _, err := stmt.ExecuteQuery(s.ctx)
s.ErrorAs(err, &e)
s.NotEqual(adbc.StatusOK, e.Code)
// SQLSTATE 42S02 == table or view not found
s.Equal([5]byte{'4', '2', 'S', '0', '2'}, e.SqlState)
})
s.Run("overwrite and incompatible schema", func() {
s.Require().NoError(s.Quirks.DropTable(s.Cnxn, "bulk_ingest"))
schema := arrow.NewSchema([]arrow.Field{{
Name: "int64s", Type: arrow.PrimitiveTypes.Int64, Nullable: true}}, nil)
batchbldr := array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer batchbldr.Release()
bldr := batchbldr.Field(0).(*array.Int64Builder)
bldr.AppendValues([]int64{42, -42, 0}, []bool{true, true, false})
batch := batchbldr.NewRecord()
defer batch.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeCreate))
s.Require().NoError(stmt.Bind(s.ctx, batch))
// create it
_, err := stmt.ExecuteUpdate(s.ctx)
s.Require().NoError(err)
// error if we try to create again
s.Require().NoError(stmt.Bind(s.ctx, batch))
var e adbc.Error
_, err = stmt.ExecuteUpdate(s.ctx)
s.ErrorAs(err, &e)
s.Equal(adbc.StatusInternal, e.Code)
// try to append an incompatible schema
schema, _ = schema.AddField(1, arrow.Field{Name: "coltwo", Type: arrow.PrimitiveTypes.Int64, Nullable: true})
batchbldr = array.NewRecordBuilder(s.Quirks.Alloc(), schema)
defer batchbldr.Release()
batchbldr.Field(0).AppendNull()
batchbldr.Field(1).AppendNull()
batch = batchbldr.NewRecord()
defer batch.Release()
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestTargetTable, "bulk_ingest"))
s.Require().NoError(stmt.SetOption(adbc.OptionKeyIngestMode, adbc.OptionValueIngestModeAppend))
s.Require().NoError(stmt.Bind(s.ctx, batch))
_, err = stmt.ExecuteUpdate(s.ctx)
s.ErrorAs(err, &e)
s.NotEqual(adbc.StatusOK, e.Code)
})
}