func()

in arrow/internal/flight_integration/scenario.go [2491:2693]


func (m *flightSqlExtensionScenarioTester) ValidateTransactions(client *flightsql.Client) error {
	ctx := context.Background()
	txn, err := client.BeginTransaction(ctx)
	if err != nil {
		return err
	}

	if err := assertEq([]byte(transactionID), []byte(txn.ID())); err != nil {
		return err
	}

	sp, err := txn.BeginSavepoint(ctx, savepointName)
	if err != nil {
		return err
	}

	if err := assertEq([]byte(savepointID), []byte(sp)); err != nil {
		return err
	}

	info, err := txn.Execute(ctx, selectStatement)
	if err != nil {
		return err
	}

	if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
		return err
	}

	info, err = txn.ExecuteSubstrait(ctx, substraitPlan)
	if err != nil {
		return err
	}

	if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
		return err
	}

	schema, err := txn.GetExecuteSchema(ctx, selectStatement)
	if err != nil {
		return err
	}

	if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
		return err
	}

	schema, err = txn.GetExecuteSubstraitSchema(ctx, substraitPlan)
	if err != nil {
		return err
	}

	if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
		return err
	}

	updated, err := txn.ExecuteUpdate(ctx, "UPDATE STATEMENT")
	if err != nil {
		return err
	}

	if err := assertEq(updateStatementWithTransactionExpectedRows, updated); err != nil {
		return err
	}

	updated, err = txn.ExecuteSubstraitUpdate(ctx, substraitPlan)
	if err != nil {
		return err
	}

	if err := assertEq(updateStatementWithTransactionExpectedRows, updated); err != nil {
		return err
	}

	arr, _, _ := array.FromJSON(memory.DefaultAllocator, arrow.PrimitiveTypes.Int64, strings.NewReader("[1]"))
	defer arr.Release()
	params := array.NewRecord(getQuerySchema(), []arrow.Array{arr}, 1)
	defer params.Release()

	prepared, err := txn.Prepare(ctx, "SELECT PREPARED STATEMENT")
	if err != nil {
		return err
	}
	prepared.SetParameters(params)

	info, err = prepared.Execute(ctx)
	if err != nil {
		return err
	}

	if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
		return err
	}

	schema, err = prepared.GetSchema(ctx)
	if err != nil {
		return err
	}

	if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
		return err
	}

	if err := prepared.Close(ctx); err != nil {
		return err
	}

	prepared, err = txn.PrepareSubstrait(ctx, substraitPlan)
	if err != nil {
		return err
	}
	prepared.SetParameters(params)

	info, err = prepared.Execute(ctx)
	if err != nil {
		return err
	}

	if err := m.validate(getQueryWithTransactionSchema(), info, client); err != nil {
		return err
	}

	schema, err = prepared.GetSchema(ctx)
	if err != nil {
		return err
	}

	if err := m.validateSchema(getQueryWithTransactionSchema(), schema); err != nil {
		return err
	}

	if err := prepared.Close(ctx); err != nil {
		return err
	}

	prepared, err = txn.Prepare(ctx, "UPDATE PREPARED STATEMENT")
	if err != nil {
		return err
	}

	updated, err = prepared.ExecuteUpdate(ctx)
	if err != nil {
		return err
	}

	if err := assertEq(updatePreparedStatementWithTransactionExpectedRows, updated); err != nil {
		return err
	}

	if err := prepared.Close(ctx); err != nil {
		return err
	}

	prepared, err = txn.PrepareSubstrait(ctx, substraitPlan)
	if err != nil {
		return err
	}

	updated, err = prepared.ExecuteUpdate(ctx)
	if err != nil {
		return err
	}

	if err := assertEq(updatePreparedStatementWithTransactionExpectedRows, updated); err != nil {
		return err
	}

	if err := prepared.Close(ctx); err != nil {
		return err
	}

	if err := txn.RollbackSavepoint(ctx, sp); err != nil {
		return err
	}

	sp2, err := txn.BeginSavepoint(ctx, savepointName)
	if err != nil {
		return err
	}

	if err := assertEq([]byte(savepointID), []byte(sp2)); err != nil {
		return err
	}

	if err := txn.ReleaseSavepoint(ctx, sp); err != nil {
		return err
	}

	if err := txn.Commit(ctx); err != nil {
		return err
	}

	txn, err = client.BeginTransaction(ctx)
	if err != nil {
		return err
	}

	if err := assertEq([]byte(transactionID), []byte(txn.ID())); err != nil {
		return err
	}

	return txn.Rollback(ctx)
}