pkg/rm/tcc/fence/fence_driver_conn.go (110 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fence
import (
"context"
"database/sql"
"database/sql/driver"
"github.com/pkg/errors"
"seata.apache.org/seata-go/pkg/tm"
"seata.apache.org/seata-go/pkg/util/log"
)
type FenceConn struct {
TargetConn driver.Conn
TargetDB *sql.DB
}
func (c *FenceConn) ResetSession(ctx context.Context) error {
resetter, ok := c.TargetConn.(driver.SessionResetter)
if !ok {
return driver.ErrSkip
}
return resetter.ResetSession(ctx)
}
func (c *FenceConn) Prepare(query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}
func (c *FenceConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
return c.TargetConn.Prepare(query)
}
func (c *FenceConn) Exec(query string, args []driver.Value) (driver.Result, error) {
execer, ok := c.TargetConn.(driver.Execer)
if !ok {
return nil, driver.ErrSkip
}
return execer.Exec(query, args)
}
func (c *FenceConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
execerContext, ok := c.TargetConn.(driver.ExecerContext)
if !ok {
values := make([]driver.Value, 0, len(args))
for i := range args {
values = append(values, args[i].Value)
}
return c.Exec(query, values)
}
return execerContext.ExecContext(ctx, query, args)
}
func (c *FenceConn) Query(query string, args []driver.Value) (driver.Rows, error) {
queryer, ok := c.TargetConn.(driver.Queryer)
if !ok {
return nil, driver.ErrSkip
}
return queryer.Query(query, args)
}
func (c *FenceConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
QueryerContext, ok := c.TargetConn.(driver.QueryerContext)
if !ok {
values := make([]driver.Value, 0, len(args))
for i := range args {
values = append(values, args[i].Value)
}
return c.Query(query, values)
}
return QueryerContext.QueryContext(ctx, query, args)
}
func (c *FenceConn) Begin() (driver.Tx, error) {
return nil, errors.New("operation unsupport")
}
func (c *FenceConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
beginer, ok := c.TargetConn.(driver.ConnBeginTx)
if !ok {
return nil, errors.New("operation unsupported")
}
tx, err := beginer.BeginTx(ctx, opts)
if err != nil {
return nil, err
}
if !tm.IsSeataContext(ctx) {
return nil, errors.New("there is not seata context")
}
// check if have been begin fence tx
if tm.IsFenceTxBegin(ctx) {
return tx, nil
}
tm.SetFenceTxBeginedFlag(ctx, true)
fenceTx, err := c.TargetDB.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if err := fenceTx.Rollback(); err != nil {
log.Error(err)
}
// although it have not any db operations yet, is still rollback to avoid leak tx.
if err := tx.Rollback(); err != nil {
log.Error(err)
}
}
}()
// do fence operations
emptyCallback := func() error {
return nil
}
if err := WithFence(ctx, fenceTx, emptyCallback); err != nil {
return nil, err
}
return &FenceTx{
Ctx: ctx,
TargetTx: tx,
TargetFenceTx: fenceTx,
}, nil
}
func (c *FenceConn) Close() error {
return c.TargetConn.Close()
}