func()

in pkg/datasource/sql/exec/select_for_update_executor.go [56:168]


func (s SelectForUpdateExecutor) ExecWithNamedValue(ctx context.Context, execCtx *types.ExecContext, f CallbackWithNamedValue) (types.ExecResult, error) {
	if !tm.IsGlobalTx(ctx) && !execCtx.IsRequireGlobalLock {
		return f(ctx, execCtx.Query, execCtx.NamedValues)
	}

	var (
		tx                 driver.Tx
		nowTs              = time.Now().Unix()
		result             types.ExecResult
		savepointName      string
		originalAutoCommit = execCtx.IsAutoCommit
	)

	table, err := execCtx.ParseContext.GetTableName()
	if err != nil {
		return nil, err
	}
	// build query primary key sql
	selectPKSQL, err := s.buildSelectPKSQL(execCtx.ParseContext.SelectStmt, execCtx.MetaDataMap[table])
	if err != nil {
		return nil, err
	}

	i := 0
	for ; i < retryTimes; i++ {
		if originalAutoCommit {
			// In order to hold the local db lock during global lock checking
			// set auto commit value to false first if original auto commit was true
			tx, err = execCtx.Conn.Begin()
			if err != nil {
				return nil, err
			}
			execCtx.IsAutoCommit = false
		} else if execCtx.IsSupportsSavepoints {
			// In order to release the local db lock when global lock conflict
			// create a save point if original auto commit was false, then use the save point here to release db
			// lock during global lock checking if necessary
			savepointName = fmt.Sprintf("savepoint %d;", nowTs)
			stmt, err := execCtx.Conn.Prepare(savepointName)
			if err != nil {
				return nil, err
			}
			if _, err = stmt.Exec(nil); err != nil {
				return nil, err
			}
		} else {
			return nil, fmt.Errorf("not support savepoint. please check your db version")
		}

		// execute business SQL, try to get local lock
		result, err = f(ctx, execCtx.Query, execCtx.NamedValues)
		if err != nil {
			return nil, err
		}

		// query primary key values
		stmt, err := execCtx.Conn.Prepare(selectPKSQL)
		if err != nil {
			return nil, err
		}
		values := make([]driver.Value, 0, len(execCtx.NamedValues))
		for _, val := range execCtx.NamedValues {
			values = append(values, val.Value)
		}
		rows, err := stmt.Query(values)
		if err != nil {
			return nil, err
		}

		lockKey := s.buildLockKey(rows, execCtx.MetaDataMap[table])
		if lockKey == "" {
			break
		}
		// check global lock
		lockable, err := datasource.GetDataSourceManager(branch.BranchTypeAT).LockQuery(ctx, rm.LockQueryParam{
			Xid:        execCtx.TxCtx.XID,
			BranchType: branch.BranchTypeAT,
			ResourceId: execCtx.TxCtx.ResourceID,
			LockKeys:   lockKey,
		})

		// if obtained global lock
		if err == nil && lockable {
			break
		}

		if savepointName != "" {
			if stmt, err = execCtx.Conn.Prepare(fmt.Sprintf("rollback to %s;", savepointName)); err != nil {
				return nil, err
			}
			if _, err = stmt.Exec(nil); err != nil {
				return nil, err
			}
		} else {
			if err = tx.Rollback(); err != nil {
				return nil, err
			}
		}
		time.Sleep(retryInterval)
	}

	if i >= retryTimes {
		return nil, fmt.Errorf("global lock wait timeout")
	}

	if originalAutoCommit {
		if err = tx.Commit(); err != nil {
			return nil, err
		}
		execCtx.IsAutoCommit = true
	}
	return result, nil
}