in pkg/datasource/sql/exec/at/select_for_update_executor.go [75:146]
func (s *selectForUpdateExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
s.beforeHooks(ctx, s.execContext)
defer func() {
s.afterHooks(ctx, s.execContext)
}()
// todo fix IsRequireGlobalLock
if !tm.IsGlobalTx(ctx) && !s.execContext.IsRequireGlobalLock {
return f(ctx, s.execContext.Query, s.execContext.NamedValues)
}
var (
result types.ExecResult
originalAutoCommit = s.execContext.IsAutoCommit
err error
)
if s.tableName, err = s.parserCtx.GetTableName(); err != nil {
return nil, err
}
if s.metaData, err = datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, s.execContext.DBName, s.tableName); err != nil {
return nil, err
}
// build query primary key sql
if s.selectPKSQL, err = s.buildSelectPKSQL(s.parserCtx.SelectStmt, s.metaData); err != nil {
return nil, err
}
bf := backoff.New(ctx, backoff.Config{
MaxRetries: s.cfg.RetryTimes,
MinBackoff: s.cfg.RetryInterval,
MaxBackoff: s.cfg.RetryInterval,
})
for bf.Ongoing() {
result, err = s.doExecContext(ctx, f)
if err == nil || errors.Is(err, lockConflictError) {
break
}
bf.Wait()
}
if bf.Err() != nil || err != nil {
if err == nil {
err = bf.Err()
}
// if there is an err in doExecContext, we should rollback first
if s.savepointName != "" {
if _, rollerr := s.exec(ctx, fmt.Sprintf("rollback to %s;", s.savepointName), nil, nil); rollerr != nil {
log.Error("rollback to %s failed, err %s", s.savepointName, rollerr.Error())
return nil, err
}
} else {
if rollerr := s.tx.Rollback(); rollerr != nil {
log.Error("rollback failed, err %s", rollerr.Error())
return nil, err
}
}
return nil, err
}
if originalAutoCommit {
if err = s.tx.Commit(); err != nil {
return nil, err
}
s.execContext.IsAutoCommit = true
}
return result, nil
}