pkg/rm/tcc/tcc_service.go (197 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package tcc import ( "context" "encoding/json" "errors" "reflect" "sync" "time" gostnet "github.com/dubbogo/gost/net" "seata.apache.org/seata-go/pkg/constant" "seata.apache.org/seata-go/pkg/protocol/branch" "seata.apache.org/seata-go/pkg/rm" "seata.apache.org/seata-go/pkg/rm/tcc/fence/enum" "seata.apache.org/seata-go/pkg/tm" "seata.apache.org/seata-go/pkg/util/log" "seata.apache.org/seata-go/pkg/util/reflectx" ) type TCCServiceProxy struct { referenceName string registerResourceOnce sync.Once *TCCResource } func NewTCCServiceProxy(service interface{}) (*TCCServiceProxy, error) { tccResource, err := ParseTCCResource(service) if err != nil { log.Errorf("invalid tcc service, err %v", err) return nil, err } proxy := &TCCServiceProxy{ TCCResource: tccResource, } return proxy, proxy.RegisterResource() } func (t *TCCServiceProxy) RegisterResource() error { var err error t.registerResourceOnce.Do(func() { err = rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource) if err != nil { log.Errorf("NewTCCServiceProxy RegisterResource error: %#v", err.Error()) } }) return err } func (t *TCCServiceProxy) SetReferenceName(referenceName string) { t.referenceName = referenceName } func (t *TCCServiceProxy) Reference() string { if t.referenceName != "" { return t.referenceName } return reflectx.GetReference(t.TCCResource.TwoPhaseAction.GetTwoPhaseService()) } func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (interface{}, error) { if tm.IsGlobalTx(ctx) { err := t.registeBranch(ctx, params) if err != nil { return nil, err } } // to set up the fence phase tm.SetFencePhase(ctx, enum.FencePhasePrepare) return t.TCCResource.Prepare(ctx, params) } // registeBranch send register branch transaction request func (t *TCCServiceProxy) registeBranch(ctx context.Context, params interface{}) error { if !tm.IsGlobalTx(ctx) { errStr := "BranchRegister error, transaction should be opened" log.Errorf(errStr) return errors.New(errStr) } tccContext := t.initBusinessActionContext(ctx, params) actionContext := t.initActionContext(params) for k, v := range actionContext { tccContext.ActionContext[k] = v } applicationData, _ := json.Marshal(map[string]interface{}{ constant.ActionContext: actionContext, }) branchId, err := rm.GetRMRemotingInstance().BranchRegister(rm.BranchRegisterParam{ BranchType: branch.BranchTypeTCC, ResourceId: t.GetActionName(), ClientId: "", Xid: tm.GetXID(ctx), ApplicationData: string(applicationData), LockKeys: "", }) if err != nil { log.Errorf("register branch transaction error %s ", err.Error()) return err } tccContext.BranchId = branchId tm.SetBusinessActionContext(ctx, tccContext) return nil } // initActionContext init action context func (t *TCCServiceProxy) initActionContext(params interface{}) map[string]interface{} { actionContext := t.getActionContextParameters(params) actionContext[constant.ActionStartTime] = time.Now().UnixNano() / 1e6 actionContext[constant.PrepareMethod] = t.TCCResource.TwoPhaseAction.GetPrepareMethodName() actionContext[constant.CommitMethod] = t.TCCResource.TwoPhaseAction.GetCommitMethodName() actionContext[constant.RollbackMethod] = t.TCCResource.TwoPhaseAction.GetRollbackMethodName() actionContext[constant.ActionName] = t.TCCResource.TwoPhaseAction.GetActionName() actionContext[constant.HostName], _ = gostnet.GetLocalIP() return actionContext } func (t *TCCServiceProxy) getActionContextParameters(params interface{}) map[string]interface{} { var ( actionContext = make(map[string]interface{}, 0) typ reflect.Type val reflect.Value isStruct bool ) if params == nil { return actionContext } if isStruct, val, typ = obtainStructValueType(params); !isStruct { return actionContext } for i := 0; i < typ.NumField(); i++ { // skip unexported anonymous filed if typ.Field(i).PkgPath != "" { continue } structField := typ.Field(i) // skip ignored field tagVal, hasTag := structField.Tag.Lookup(constant.TccBusinessActionContextParameter) if !hasTag || tagVal == `-` || tagVal == "" { continue } actionContext[tagVal] = val.Field(i).Interface() } return actionContext } // initBusinessActionContext init tcc context func (t *TCCServiceProxy) initBusinessActionContext(ctx context.Context, params interface{}) *tm.BusinessActionContext { tccContext := t.getOrCreateBusinessActionContext(params) tccContext.Xid = tm.GetXID(ctx) tccContext.ActionName = t.GetActionName() // todo read from config file tccContext.IsDelayReport = true if tccContext.ActionContext == nil { tccContext.ActionContext = make(map[string]interface{}, 0) } return tccContext } // getOrCreateBusinessActionContext When the parameters of the prepare method are the following scenarios, obtain the context in the following ways: // 1. null: create new BusinessActionContext // 2. tm.BusinessActionContext: return it // 3. *tm.BusinessActionContext: if nil then create new BusinessActionContext, else return it // 4. Struct: if there is an attribute of businessactioncontext enum and it is not nil, return it // 5. else: create new BusinessActionContext func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) *tm.BusinessActionContext { if params == nil { return &tm.BusinessActionContext{} } switch params.(type) { case tm.BusinessActionContext: v := params.(tm.BusinessActionContext) return &v case *tm.BusinessActionContext: v := params.(*tm.BusinessActionContext) if v != nil { return v } return &tm.BusinessActionContext{} default: break } var ( typ reflect.Type val reflect.Value isStruct bool ) if isStruct, val, typ = obtainStructValueType(params); !isStruct { return &tm.BusinessActionContext{} } n := typ.NumField() for i := 0; i < n; i++ { sf := typ.Field(i) if sf.Type == rm.TypBusinessContextInterface { v := val.Field(i).Interface() if v != nil { return v.(*tm.BusinessActionContext) } } if sf.Type == reflect.TypeOf(tm.BusinessActionContext{}) && val.Field(i).CanInterface() { v := val.Field(i).Interface().(tm.BusinessActionContext) return &v } } return &tm.BusinessActionContext{} } // obtainStructValueType check o is struct or pointer enum func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) { v := reflect.ValueOf(o) t := reflect.TypeOf(o) switch v.Kind() { case reflect.Struct: return true, v, t case reflect.Ptr: return true, v.Elem(), t.Elem() default: return false, v, nil } } func (t *TCCServiceProxy) GetTransactionInfo() tm.GtxConfig { // todo replace with config return tm.GtxConfig{ Timeout: time.Second * 10, Name: t.GetActionName(), // Propagation, Propagation // LockRetryInternal, int64 // LockRetryTimes int64 } }