pkg/rm/rm_remoting.go (137 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rm
import (
"fmt"
"sync"
"github.com/pkg/errors"
"seata.apache.org/seata-go/pkg/protocol/message"
"seata.apache.org/seata-go/pkg/remoting/getty"
"seata.apache.org/seata-go/pkg/util/log"
)
var (
rmRemoting *RMRemoting
onceGettyRemoting = &sync.Once{}
)
var ErrBranchReportResponseFault = errors.New("branch report response fault")
func GetRMRemotingInstance() *RMRemoting {
if rmRemoting == nil {
onceGettyRemoting.Do(func() {
rmRemoting = &RMRemoting{}
})
}
return rmRemoting
}
type RMRemoting struct{}
// BranchRegister Register branch of global transaction
func (r *RMRemoting) BranchRegister(param BranchRegisterParam) (int64, error) {
request := message.BranchRegisterRequest{
Xid: param.Xid,
LockKey: param.LockKeys,
ResourceId: param.ResourceId,
BranchType: param.BranchType,
ApplicationData: []byte(param.ApplicationData),
}
resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
if err != nil || resp == nil {
log.Errorf("BranchRegister error: %v, res %v", err.Error(), resp)
return 0, err
}
branchResp := resp.(message.BranchRegisterResponse)
if branchResp.ResultCode == message.ResultCodeFailed {
return 0, fmt.Errorf("Response %s", branchResp.Msg)
}
return branchResp.BranchId, nil
}
// BranchReport Report status of transaction branch
func (r *RMRemoting) BranchReport(param BranchReportParam) error {
request := message.BranchReportRequest{
Xid: param.Xid,
BranchId: param.BranchId,
Status: param.Status,
ApplicationData: []byte(param.ApplicationData),
BranchType: param.BranchType,
}
resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
if err != nil {
log.Errorf("branch report request error: %+v", err)
return err
}
if err = isReportSuccess(resp); err != nil {
log.Errorf("BranchReport response error: %v, res %v", err.Error(), resp)
return err
}
return nil
}
// LockQuery Query lock status of transaction branch
func (r *RMRemoting) LockQuery(param LockQueryParam) (bool, error) {
req := message.GlobalLockQueryRequest{
BranchRegisterRequest: message.BranchRegisterRequest{
Xid: param.Xid,
LockKey: param.LockKeys,
ResourceId: param.ResourceId,
BranchType: param.BranchType,
},
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("send lock query request error: {%#v}", err.Error())
return false, err
}
if isQueryLockSuccess(res) {
log.Infof("lock is lockable, lock %s", param.LockKeys)
return true, nil
}
log.Infof("lock is unlockable, lock %s", param.LockKeys)
return false, nil
}
func (r *RMRemoting) RegisterResource(resource Resource) error {
req := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
Version: "1.5.2",
ApplicationId: rmConfig.ApplicationID,
TransactionServiceGroup: rmConfig.TxServiceGroup,
},
ResourceIds: resource.GetResourceId(),
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("RegisterResourceManager error: {%#v}", err.Error())
return err
}
if isRegisterSuccess(res) {
r.onRegisterRMSuccess(res.(message.RegisterRMResponse))
} else {
r.onRegisterRMFailure(res.(message.RegisterRMResponse))
}
return nil
}
func isQueryLockSuccess(response interface{}) bool {
if res, ok := response.(message.GlobalLockQueryResponse); ok {
return res.Lockable
}
return false
}
func isRegisterSuccess(response interface{}) bool {
if res, ok := response.(message.RegisterRMResponse); ok {
return res.Identified
}
return false
}
func isReportSuccess(response interface{}) error {
if res, ok := response.(message.BranchReportResponse); ok {
if res.ResultCode == message.ResultCodeFailed {
return errors.New(res.Msg)
}
} else {
return ErrBranchReportResponseFault
}
return nil
}
func (r *RMRemoting) onRegisterRMSuccess(response message.RegisterRMResponse) {
log.Infof("register RM success. response: %#v", response)
}
func (r *RMRemoting) onRegisterRMFailure(response message.RegisterRMResponse) {
log.Infof("register RM failure. response: %#v", response)
}
func (r *RMRemoting) onRegisterTMSuccess(response message.RegisterTMResponse) {
log.Infof("register TM success. response: %#v", response)
}
func (r *RMRemoting) onRegisterTMFailure(response message.RegisterTMResponse) {
log.Infof("register TM failure. response: %#v", response)
}