func()

in pkg/datasource/sql/exec/at/select_for_update_executor.go [148:213]


func (s *selectForUpdateExecutor) doExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
	var (
		now                = time.Now().Unix()
		result             types.ExecResult
		originalAutoCommit = s.execContext.IsAutoCommit
		err                error
	)

	if originalAutoCommit {
		// In order to hold the local db lock during global lock checking
		// set auto commit value to false first if original auto commit was true
		s.execContext.IsAutoCommit = false
		s.tx, err = s.execContext.Conn.Begin()
		if err != nil {
			return nil, err
		}
	} else if s.execContext.IsSupportsSavepoints {
		// In order to release the local db lock when global lock conflict
		// create a save point if original auto commit was false, then use the save point here to release db
		// lock during global lock checking if necessary
		savepointName := fmt.Sprintf("seatago%dpoint;", now)
		if _, err = s.exec(ctx, fmt.Sprintf("savepoint %s;", savepointName), nil, nil); err != nil {
			return nil, err
		}
		s.savepointName = savepointName
	} else {
		return nil, fmt.Errorf("not support savepoint. please check your db version")
	}

	// query primary key values
	var lockKey string
	_, err = s.exec(ctx, s.selectPKSQL, s.execContext.NamedValues, func(rows driver.Rows) {
		lockKey = s.buildLockKey(rows, s.metaData)
	})

	if err != nil {
		return nil, err
	}

	if lockKey == "" {
		return nil, nil
	}

	// execute business SQL, try to get local lock
	result, err = f(ctx, s.execContext.Query, s.execContext.NamedValues)
	if err != nil {
		return nil, err
	}

	// check global lock
	lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, rm.LockQueryParam{
		Xid:        s.execContext.TxCtx.XID,
		BranchType: branch.BranchTypeAT,
		ResourceId: s.execContext.TxCtx.ResourceID,
		LockKeys:   lockKey,
	})
	if err != nil {
		return nil, err
	}

	if !lockable {
		return nil, lockConflictError
	}

	return result, nil
}