in pkg/datasource/sql/exec/select_for_update_executor.go [170:277]
func (s SelectForUpdateExecutor) ExecWithValue(ctx context.Context, execCtx *types.ExecContext, f CallbackWithValue) (types.ExecResult, error) {
if !tm.IsGlobalTx(ctx) && !execCtx.IsRequireGlobalLock {
return f(ctx, execCtx.Query, execCtx.Values)
}
var (
tx driver.Tx
nowTs = time.Now().Unix()
result types.ExecResult
savepointName string
originalAutoCommit = execCtx.IsAutoCommit
)
table, err := execCtx.ParseContext.GetTableName()
if err != nil {
return nil, err
}
// build query primary key sql
selectPKSQL, err := s.buildSelectPKSQL(execCtx.ParseContext.SelectStmt, execCtx.MetaDataMap[table])
if err != nil {
return nil, err
}
i := 0
for ; i < retryTimes; i++ {
if originalAutoCommit {
// In order to hold the local db lock during global lock checking
// set auto commit value to false first if original auto commit was true
tx, err = execCtx.Conn.Begin()
if err != nil {
return nil, err
}
} else if execCtx.IsSupportsSavepoints {
// In order to release the local db lock when global lock conflict
// create a save point if original auto commit was false, then use the save point here to release db
// lock during global lock checking if necessary
savepointName = fmt.Sprintf("savepoint %d;", nowTs)
stmt, err := execCtx.Conn.Prepare(savepointName)
if err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("not support savepoint. please check your db version")
}
// execute business SQL, try to get local lock
result, err = f(ctx, execCtx.Query, execCtx.Values)
if err != nil {
return nil, err
}
// query primary key values
stmt, err := execCtx.Conn.Prepare(selectPKSQL)
if err != nil {
return nil, err
}
rows, err := stmt.Query(execCtx.Values)
if err != nil {
return nil, err
}
lockKey := s.buildLockKey(rows, execCtx.MetaDataMap[table])
if lockKey == "" {
break
}
// check global lock
lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, rm.LockQueryParam{
Xid: execCtx.TxCtx.XID,
BranchType: branch.BranchTypeAT,
ResourceId: execCtx.TxCtx.ResourceID,
LockKeys: lockKey,
})
// has obtained global lock
if err == nil && lockable {
break
}
if savepointName != "" {
if stmt, err = execCtx.Conn.Prepare(fmt.Sprintf("rollback to %s;", savepointName)); err != nil {
return nil, err
}
if _, err = stmt.Exec(nil); err != nil {
return nil, err
}
} else {
if err = tx.Rollback(); err != nil {
return nil, err
}
}
time.Sleep(retryInterval)
}
if i >= retryTimes {
return nil, fmt.Errorf("global lock wait timeout")
}
if originalAutoCommit {
if err = tx.Commit(); err != nil {
return nil, err
}
execCtx.IsAutoCommit = true
}
return result, nil
}