in arrow/internal/flight_integration/scenario.go [2491:2693]
func (m *flightSqlExtensionScenarioTester) ValidateTransactions(client *flightsql.Client) error {
ctx := context.Background()
txn, err := client.BeginTransaction(ctx)
if err != nil {
return err
}
if err := assertEq([]byte(transactionID), []byte(txn.ID())); err != nil {
return err
}
sp, err := txn.BeginSavepoint(ctx, savepointName)
if err != nil {
return err
}
if err := assertEq([]byte(savepointID), []byte(sp)); err != nil {
return err
}
info, err := txn.Execute(ctx, selectStatement)
if err != nil {
return err
}
if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
return err
}
info, err = txn.ExecuteSubstrait(ctx, substraitPlan)
if err != nil {
return err
}
if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
return err
}
schema, err := txn.GetExecuteSchema(ctx, selectStatement)
if err != nil {
return err
}
if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
return err
}
schema, err = txn.GetExecuteSubstraitSchema(ctx, substraitPlan)
if err != nil {
return err
}
if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
return err
}
updated, err := txn.ExecuteUpdate(ctx, "UPDATE STATEMENT")
if err != nil {
return err
}
if err := assertEq(updateStatementWithTransactionExpectedRows, updated); err != nil {
return err
}
updated, err = txn.ExecuteSubstraitUpdate(ctx, substraitPlan)
if err != nil {
return err
}
if err := assertEq(updateStatementWithTransactionExpectedRows, updated); err != nil {
return err
}
arr, _, _ := array.FromJSON(memory.DefaultAllocator, arrow.PrimitiveTypes.Int64, strings.NewReader("[1]"))
defer arr.Release()
params := array.NewRecord(getQuerySchema(), []arrow.Array{arr}, 1)
defer params.Release()
prepared, err := txn.Prepare(ctx, "SELECT PREPARED STATEMENT")
if err != nil {
return err
}
prepared.SetParameters(params)
info, err = prepared.Execute(ctx)
if err != nil {
return err
}
if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
return err
}
schema, err = prepared.GetSchema(ctx)
if err != nil {
return err
}
if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
return err
}
if err := prepared.Close(ctx); err != nil {
return err
}
prepared, err = txn.PrepareSubstrait(ctx, substraitPlan)
if err != nil {
return err
}
prepared.SetParameters(params)
info, err = prepared.Execute(ctx)
if err != nil {
return err
}
if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
return err
}
schema, err = prepared.GetSchema(ctx)
if err != nil {
return err
}
if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
return err
}
if err := prepared.Close(ctx); err != nil {
return err
}
prepared, err = txn.Prepare(ctx, "UPDATE PREPARED STATEMENT")
if err != nil {
return err
}
updated, err = prepared.ExecuteUpdate(ctx)
if err != nil {
return err
}
if err := assertEq(updatePreparedStatementWithTransactionExpectedRows, updated); err != nil {
return err
}
if err := prepared.Close(ctx); err != nil {
return err
}
prepared, err = txn.PrepareSubstrait(ctx, substraitPlan)
if err != nil {
return err
}
updated, err = prepared.ExecuteUpdate(ctx)
if err != nil {
return err
}
if err := assertEq(updatePreparedStatementWithTransactionExpectedRows, updated); err != nil {
return err
}
if err := prepared.Close(ctx); err != nil {
return err
}
if err := txn.RollbackSavepoint(ctx, sp); err != nil {
return err
}
sp2, err := txn.BeginSavepoint(ctx, savepointName)
if err != nil {
return err
}
if err := assertEq([]byte(savepointID), []byte(sp2)); err != nil {
return err
}
if err := txn.ReleaseSavepoint(ctx, sp); err != nil {
return err
}
if err := txn.Commit(ctx); err != nil {
return err
}
txn, err = client.BeginTransaction(ctx)
if err != nil {
return err
}
if err := assertEq([]byte(transactionID), []byte(txn.ID())); err != nil {
return err
}
return txn.Rollback(ctx)
}