func()

in pkg/datasource/sql/exec/at/select_for_update_executor.go [75:146]


func (s *selectForUpdateExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
	s.beforeHooks(ctx, s.execContext)
	defer func() {
		s.afterHooks(ctx, s.execContext)
	}()

	// todo fix IsRequireGlobalLock
	if !tm.IsGlobalTx(ctx) && !s.execContext.IsRequireGlobalLock {
		return f(ctx, s.execContext.Query, s.execContext.NamedValues)
	}

	var (
		result             types.ExecResult
		originalAutoCommit = s.execContext.IsAutoCommit
		err                error
	)

	if s.tableName, err = s.parserCtx.GetTableName(); err != nil {
		return nil, err
	}

	if s.metaData, err = datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, s.execContext.DBName, s.tableName); err != nil {
		return nil, err
	}

	// build query primary key sql
	if s.selectPKSQL, err = s.buildSelectPKSQL(s.parserCtx.SelectStmt, s.metaData); err != nil {
		return nil, err
	}

	bf := backoff.New(ctx, backoff.Config{
		MaxRetries: s.cfg.RetryTimes,
		MinBackoff: s.cfg.RetryInterval,
		MaxBackoff: s.cfg.RetryInterval,
	})

	for bf.Ongoing() {
		result, err = s.doExecContext(ctx, f)
		if err == nil || errors.Is(err, lockConflictError) {
			break
		}
		bf.Wait()
	}

	if bf.Err() != nil || err != nil {
		if err == nil {
			err = bf.Err()
		}
		// if there is an err in doExecContext, we should rollback first
		if s.savepointName != "" {
			if _, rollerr := s.exec(ctx, fmt.Sprintf("rollback to %s;", s.savepointName), nil, nil); rollerr != nil {
				log.Error("rollback to %s failed, err %s", s.savepointName, rollerr.Error())
				return nil, err
			}
		} else {
			if rollerr := s.tx.Rollback(); rollerr != nil {
				log.Error("rollback failed, err %s", rollerr.Error())
				return nil, err
			}
		}
		return nil, err
	}

	if originalAutoCommit {
		if err = s.tx.Commit(); err != nil {
			return nil, err
		}
		s.execContext.IsAutoCommit = true
	}

	return result, nil
}