in pkg/datasource/sql/exec/at/select_for_update_executor.go [148:213]
func (s *selectForUpdateExecutor) doExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
var (
now = time.Now().Unix()
result types.ExecResult
originalAutoCommit = s.execContext.IsAutoCommit
err error
)
if originalAutoCommit {
// In order to hold the local db lock during global lock checking
// set auto commit value to false first if original auto commit was true
s.execContext.IsAutoCommit = false
s.tx, err = s.execContext.Conn.Begin()
if err != nil {
return nil, err
}
} else if s.execContext.IsSupportsSavepoints {
// In order to release the local db lock when global lock conflict
// create a save point if original auto commit was false, then use the save point here to release db
// lock during global lock checking if necessary
savepointName := fmt.Sprintf("seatago%dpoint;", now)
if _, err = s.exec(ctx, fmt.Sprintf("savepoint %s;", savepointName), nil, nil); err != nil {
return nil, err
}
s.savepointName = savepointName
} else {
return nil, fmt.Errorf("not support savepoint. please check your db version")
}
// query primary key values
var lockKey string
_, err = s.exec(ctx, s.selectPKSQL, s.execContext.NamedValues, func(rows driver.Rows) {
lockKey = s.buildLockKey(rows, s.metaData)
})
if err != nil {
return nil, err
}
if lockKey == "" {
return nil, nil
}
// execute business SQL, try to get local lock
result, err = f(ctx, s.execContext.Query, s.execContext.NamedValues)
if err != nil {
return nil, err
}
// check global lock
lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, rm.LockQueryParam{
Xid: s.execContext.TxCtx.XID,
BranchType: branch.BranchTypeAT,
ResourceId: s.execContext.TxCtx.ResourceID,
LockKeys: lockKey,
})
if err != nil {
return nil, err
}
if !lockable {
return nil, lockConflictError
}
return result, nil
}