pkg/datasource/sql/conn.go (187 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sql import ( "context" "database/sql/driver" "seata.apache.org/seata-go/pkg/datasource/sql/exec" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/util" ) // Conn is a connection to a database. It is not used concurrently // by multiple goroutines. // // Conn is assumed to be stateful. type Conn struct { res *DBResource txCtx *types.TransactionContext targetConn driver.Conn autoCommit bool dbName string dbType types.DBType } // ResetSession is called prior to executing a query on the connection // if the connection has been used before. If the driver returns ErrBadConn // the connection is discarded. func (c *Conn) ResetSession(ctx context.Context) error { conn, ok := c.targetConn.(driver.SessionResetter) if !ok { return driver.ErrSkip } c.autoCommit = true c.txCtx = types.NewTxCtx() return conn.ResetSession(ctx) } // Prepare returns a prepared statement, bound to this connection. func (c *Conn) Prepare(query string) (driver.Stmt, error) { s, err := c.targetConn.Prepare(query) if err != nil { return nil, err } return &Stmt{ conn: c, stmt: s, query: query, res: c.res, txCtx: c.txCtx, }, nil } // PrepareContext func (c *Conn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) { conn, ok := c.targetConn.(driver.ConnPrepareContext) if !ok { stmt, err := c.targetConn.Prepare(query) if err != nil { return nil, err } return &Stmt{stmt: stmt, query: query, res: c.res, txCtx: c.txCtx}, nil } s, err := conn.PrepareContext(ctx, query) if err != nil { return nil, err } return &Stmt{ conn: c, stmt: s, query: query, res: c.res, txCtx: c.txCtx, }, nil } // Exec warning: if you want to use global transaction, please use ExecContext function func (c *Conn) Exec(query string, args []driver.Value) (driver.Result, error) { conn, ok := c.targetConn.(driver.Execer) if !ok { return nil, driver.ErrSkip } ret, err := conn.Exec(query, args) if err != nil { return nil, err } return types.NewResult(types.WithResult(ret)).GetResult(), nil } // ExecContext func (c *Conn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) { targetConn, ok := c.targetConn.(driver.ExecerContext) if !ok { values := make([]driver.Value, 0, len(args)) for i := range args { values = append(values, args[i].Value) } return c.Exec(query, values) } ret, err := targetConn.ExecContext(ctx, query, args) if err != nil { return nil, err } return types.NewResult(types.WithResult(ret)).GetResult(), nil } // Query func (c *Conn) Query(query string, args []driver.Value) (driver.Rows, error) { conn, ok := c.targetConn.(driver.Queryer) if !ok { return nil, driver.ErrSkip } executor, err := exec.BuildExecutor(c.res.dbType, c.txCtx.TransactionMode, query) if err != nil { return nil, err } execCtx := &types.ExecContext{ TxCtx: c.txCtx, Query: query, Values: args, } ret, err := executor.ExecWithValue(context.Background(), execCtx, func(ctx context.Context, query string, args []driver.NamedValue) (types.ExecResult, error) { ret, err := conn.Query(query, util.NamedValueToValue(args)) if err != nil { return nil, err } return types.NewResult(types.WithRows(ret)), nil }) if err != nil { return nil, err } return ret.GetRows(), nil } // QueryContext func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) { conn, ok := c.targetConn.(driver.QueryerContext) if !ok { values := make([]driver.Value, 0, len(args)) for i := range args { values = append(values, args[i].Value) } return c.Query(query, values) } ret, err := conn.QueryContext(ctx, query, args) if err != nil { return nil, err } return types.NewResult(types.WithRows(ret)).GetRows(), nil } // Begin starts and returns a new transaction. // // Deprecated: Drivers should implement ConnBeginTx instead (or additionally). func (c *Conn) Begin() (driver.Tx, error) { c.autoCommit = false tx, err := c.targetConn.Begin() if err != nil { return nil, err } if c.txCtx == nil { c.txCtx = types.NewTxCtx() c.txCtx.DBType = c.res.dbType c.txCtx.TxOpt = driver.TxOptions{} } return newTx( withDriverConn(c), withTxCtx(c.txCtx), withOriginTx(tx), ) } // BeginTx Open a transaction and judge whether the current transaction needs to open a // // global transaction according to tranCtx. If so, it needs to be included in the transaction management of seata func (c *Conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) { if c.txCtx.TransactionMode == types.XAMode { c.autoCommit = false return newTx( withDriverConn(c), withTxCtx(c.txCtx), withOriginTx(nil), ) } if conn, ok := c.targetConn.(driver.ConnBeginTx); ok { tx, err := conn.BeginTx(ctx, opts) if err != nil { return nil, err } return newTx( withDriverConn(c), withTxCtx(c.txCtx), withOriginTx(tx), ) } txi, err := c.Begin() if err != nil { return nil, err } return newTx( withDriverConn(c), withTxCtx(c.txCtx), withOriginTx(txi), ) } func (c *Conn) GetAutoCommit() bool { return c.autoCommit } func (c *Conn) GetDbVersion() string { if c.res == nil { return "" } return c.res.GetDbVersion() } // Close invalidates and potentially stops any current // prepared statements and transactions, marking this // connection as no longer in use. // // Because the sql package maintains a free pool of // connections and only calls Close when there's a surplus of // idle connections, it shouldn't be necessary for drivers to // do their own connection caching. // // Drivers must ensure all network calls made by Close // do not block indefinitely (e.g. apply a timeout). func (c *Conn) Close() error { c.txCtx = types.NewTxCtx() return c.targetConn.Close() }