pkg/datasource/sql/db.go (191 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sql import ( "context" "database/sql" "database/sql/driver" "fmt" "sync" "seata.apache.org/seata-go/pkg/datasource/sql/datasource" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/undo" "seata.apache.org/seata-go/pkg/datasource/sql/util" "seata.apache.org/seata-go/pkg/datasource/sql/xa" "seata.apache.org/seata-go/pkg/protocol/branch" "seata.apache.org/seata-go/pkg/util/log" ) type dbOption func(db *DBResource) func withDsn(dsn string) dbOption { return func(db *DBResource) { db.dsn = dsn } } func withResourceID(id string) dbOption { return func(db *DBResource) { db.resourceID = id } } func withTableMetaCache(c datasource.TableMetaCache) dbOption { return func(db *DBResource) { db.metaCache = c } } func withDBType(dt types.DBType) dbOption { return func(db *DBResource) { db.dbType = dt } } func withBranchType(dt branch.BranchType) dbOption { return func(db *DBResource) { db.branchType = dt } } func withTarget(source *sql.DB) dbOption { return func(db *DBResource) { db.db = source } } func withConnector(ci driver.Connector) dbOption { return func(db *DBResource) { db.connector = ci } } func withDBName(dbName string) dbOption { return func(db *DBResource) { db.dbName = dbName } } func withConf(conf *XAConnConf) dbOption { return func(db *DBResource) { db.xaConnConf = conf } } func newResource(opts ...dbOption) (*DBResource, error) { db := new(DBResource) for i := range opts { opts[i](db) } db.init() return db, nil } // DBResource proxy sql.DB, enchance database/sql.DB to add distribute transaction ability type DBResource struct { xaConnConf *XAConnConf // only use by mysql dbVersion string dsn string resourceID string db *sql.DB connector driver.Connector dbName string dbType types.DBType undoLogMgr undo.UndoLogManager branchType branch.BranchType // for xa metaCache datasource.TableMetaCache shouldBeHeld bool keeper sync.Map } func (db *DBResource) GetResourceGroupId() string { panic("implement me") } func (db *DBResource) init() { ctx := context.Background() conn, err := db.connector.Connect(ctx) if err != nil { log.Errorf("connect: %w", err) } version, err := selectDBVersion(ctx, conn) if err != nil { log.Errorf("select db version: %w", err) } db.SetDbVersion(version) db.checkDbVersion() } func (db *DBResource) GetResourceId() string { return db.resourceID } func (db *DBResource) GetBranchType() branch.BranchType { return db.branchType } func (db *DBResource) GetDB() *sql.DB { return db.db } func (db *DBResource) GetDBName() string { return db.dbName } func (db *DBResource) GetDbType() types.DBType { return db.dbType } func (db *DBResource) SetDbType(dbType types.DBType) { db.dbType = dbType } func (db *DBResource) SetDbVersion(v string) { db.dbVersion = v } func (db *DBResource) GetDbVersion() string { return db.dbVersion } func (db *DBResource) IsShouldBeHeld() bool { return db.shouldBeHeld } // Hold the xa connection. func (db *DBResource) Hold(xaBranchID string, v interface{}) error { _, exist := db.keeper.Load(xaBranchID) if !exist { db.keeper.Store(xaBranchID, v) return nil } return nil } func (db *DBResource) Release(xaBranchID string) { db.keeper.Delete(xaBranchID) } func (db *DBResource) Lookup(xaBranchID string) (interface{}, bool) { return db.keeper.Load(xaBranchID) } func (db *DBResource) GetKeeper() *sync.Map { return &db.keeper } func (db *DBResource) ConnectionForXA(ctx context.Context, xaXid XAXid) (*XAConn, error) { xaBranchXid := xaXid.String() tmpConn, ok := db.Lookup(xaBranchXid) if ok && tmpConn != nil { connectionProxyXa, isConnectionProxyXa := tmpConn.(*XAConn) if !isConnectionProxyXa { return nil, fmt.Errorf("get connection proxy xa from cache error, xid:%s", xaXid.String()) } return connectionProxyXa, nil } // why here need a new connection? // 1. because there maybe a rm cluster // 2. the first phase select a rm1, and store the connection is the keeper // 3. tc request the second phase. but the rm1 is shutdown, so the tc select another rm (like rm2) // 4. so when the second phase request coming to rm2, rm2 must not store the connection. // 5. the rm2 get the second phase do the two thing. // 1. in mysql version >= 8.0.29, mysql support the xa transaction commit by another connection. so just commit // 2. when the version < 8.0.29. so just make the transaction rollback newDriverConn, err := db.connector.Connect(ctx) if err != nil { return nil, fmt.Errorf("get xa new connection failure, xid:%s, err:%v", xaXid.String(), err) } xaResource, err := xa.CreateXAResource(newDriverConn, types.DBTypeMySQL) if err != nil { return nil, fmt.Errorf("create xa resoruce err:%w", err) } xaConn := &XAConn{ Conn: &Conn{ targetConn: newDriverConn, res: db, }, xaBranchXid: XaIdBuild(xaXid.GetGlobalXid(), xaXid.GetBranchId()), xaResource: xaResource, } return xaConn, nil } func (db *DBResource) checkDbVersion() error { switch db.dbType { case types.DBTypeMySQL: currentVersion, err := util.ConvertDbVersion(db.dbVersion) if err != nil { return fmt.Errorf("new connection xa proxy convert db version:%s err:%v", db.GetDbVersion(), err) } shouldKeptVersion, err := util.ConvertDbVersion("8.0.29") if err != nil { return fmt.Errorf("new connection xa proxy convert db version 8.0.29 err:%v", err) } if currentVersion < shouldKeptVersion { db.shouldBeHeld = true } case types.DBTypeMARIADB: db.shouldBeHeld = true } return nil }