pkg/rm/tcc/tcc_resource.go (144 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package tcc
import (
"context"
"encoding/json"
"fmt"
"seata.apache.org/seata-go/pkg/rm/tcc/fence"
"sync"
"seata.apache.org/seata-go/pkg/constant"
"seata.apache.org/seata-go/pkg/protocol/branch"
"seata.apache.org/seata-go/pkg/rm"
"seata.apache.org/seata-go/pkg/rm/tcc/fence/enum"
"seata.apache.org/seata-go/pkg/tm"
"seata.apache.org/seata-go/pkg/util/log"
)
var (
tCCResourceManager *TCCResourceManager
onceTCCResourceManager = &sync.Once{}
)
type TCCResource struct {
ResourceGroupId string `default:"DEFAULT"`
AppName string
*rm.TwoPhaseAction
}
func ParseTCCResource(v interface{}) (*TCCResource, error) {
t, err := rm.ParseTwoPhaseAction(v)
if err != nil {
log.Errorf("%#v is not tcc two phase service, %s", v, err.Error())
return nil, err
}
return &TCCResource{
// todo read from config
ResourceGroupId: `default:"DEFAULT"`,
AppName: "seata-go-mock-app-name",
TwoPhaseAction: t,
}, nil
}
func (t *TCCResource) GetResourceGroupId() string {
return t.ResourceGroupId
}
func (t *TCCResource) GetResourceId() string {
return t.TwoPhaseAction.GetActionName()
}
func (t *TCCResource) GetBranchType() branch.BranchType {
return branch.BranchTypeTCC
}
func InitTCC(cfg fence.Config) {
fence.InitFenceConfig(cfg)
rm.GetRmCacheInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
}
func GetTCCResourceManagerInstance() *TCCResourceManager {
if tCCResourceManager == nil {
onceTCCResourceManager.Do(func() {
tCCResourceManager = &TCCResourceManager{
resourceManagerMap: sync.Map{},
rmRemoting: rm.GetRMRemotingInstance(),
}
})
}
return tCCResourceManager
}
type TCCResourceManager struct {
rmRemoting *rm.RMRemoting
// resourceID -> resource
resourceManagerMap sync.Map
}
// BranchRegister register transaction branch
func (t *TCCResourceManager) BranchRegister(ctx context.Context, param rm.BranchRegisterParam) (int64, error) {
return t.rmRemoting.BranchRegister(param)
}
// BranchReport report status of transaction branch
func (t *TCCResourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
return t.rmRemoting.BranchReport(param)
}
// LockQuery query lock status of transaction branch
func (t *TCCResourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) {
// TODO implement me
panic("implement me")
}
func (t *TCCResourceManager) UnregisterResource(resource rm.Resource) error {
// TODO implement me
panic("implement me")
}
func (t *TCCResourceManager) RegisterResource(resource rm.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
return t.rmRemoting.RegisterResource(resource)
}
func (t *TCCResourceManager) GetCachedResources() *sync.Map {
return &t.resourceManagerMap
}
// Commit a branch transaction
func (t *TCCResourceManager) BranchCommit(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("TCC resource is not exist, resourceId: %s", branchResource.ResourceId)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}
businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)
// to set up the fence phase
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, branchResource.Xid)
tm.SetFencePhase(ctx, enum.FencePhaseCommit)
tm.SetBusinessActionContext(ctx, businessActionContext)
_, err := tccResource.TwoPhaseAction.Commit(ctx, businessActionContext)
if err != nil {
return branch.BranchStatusPhasetwoCommitFailedRetryable, err
}
return branch.BranchStatusPhasetwoCommitted, err
}
func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) *tm.BusinessActionContext {
actionContextMap := make(map[string]interface{}, 2)
if len(applicationData) > 0 {
var tccContext map[string]interface{}
if err := json.Unmarshal(applicationData, &tccContext); err != nil {
panic("application data failed to unmarshl as json")
}
if v, ok := tccContext[constant.ActionContext]; ok {
actionContextMap = v.(map[string]interface{})
}
}
return &tm.BusinessActionContext{
Xid: xid,
BranchId: branchID,
ActionName: resourceID,
ActionContext: actionContextMap,
}
}
// Rollback a branch transaction
func (t *TCCResourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", branchResource.ResourceId)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}
businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)
// to set up the fence phase
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, branchResource.Xid)
tm.SetFencePhase(ctx, enum.FencePhaseRollback)
tm.SetBusinessActionContext(ctx, businessActionContext)
_, err := tccResource.TwoPhaseAction.Rollback(ctx, businessActionContext)
if err != nil {
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
}
return branch.BranchStatusPhasetwoRollbacked, err
}
func (t *TCCResourceManager) GetBranchType() branch.BranchType {
return branch.BranchTypeTCC
}