pkg/tm/transaction_executor.go (130 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package tm import ( "context" "fmt" "time" "github.com/pkg/errors" "seata.apache.org/seata-go/pkg/protocol/message" "seata.apache.org/seata-go/pkg/util/log" ) type GtxConfig struct { Timeout time.Duration Name string Propagation Propagation LockRetryInternal time.Duration LockRetryTimes int16 } // CallbackWithCtx business callback definition type CallbackWithCtx func(ctx context.Context) error // WithGlobalTx begin a global transaction and make it step into committed or rollbacked status. func WithGlobalTx(ctx context.Context, gc *GtxConfig, business CallbackWithCtx) (re error) { if gc == nil { return fmt.Errorf("global transaction config info is required.") } if gc.Name == "" { return fmt.Errorf("global transaction name is required.") } // open global transaction for the first time if !IsSeataContext(ctx) { ctx = InitSeataContext(ctx) } if IsGlobalTx(ctx) { clearTxConf(ctx) } if re = begin(ctx, gc); re != nil { return } defer func() { var err error deferErr := recover() // no need to do second phase if propagation is some type e.g. NotSupported. if IsGlobalTx(ctx) { // business maybe to throw panic, so need to recover it here. if err = commitOrRollback(ctx, deferErr == nil && re == nil); err != nil { log.Errorf("global transaction xid %s, name %s second phase error", GetXID(ctx), GetTxName(ctx), err) } } if re != nil || err != nil { re = fmt.Errorf("first phase error: %v, second phase error: %v", re, err) } }() re = business(ctx) return } // begin a global transaction, it will obtain a xid and put into ctx from tc by tcp rpc. // it will to call two function beginNewGtx and useExistGtx // they do these operations on the transaction: // beginNewGtx: // start a new transaction, the previous transaction may be suspended or not exist // useExistGtx: // use the previous transaction, but the transaction obtained by propagation, // but will modify the current transaction role to participant. // in local transaction mode, the transaction role may be overwritten due to sharing a ctx, // so it is also necessary to provide suspend and resume operations like xid, // but here I do not use this method, instead, simulate the call in rpc mode, // construct a new context object and set the xid. // the advantage of this is that the suspend and resume operations of xid need not to be considered. func begin(ctx context.Context, gc *GtxConfig) error { switch pg := gc.Propagation; pg { case NotSupported: // If transaction is existing, suspend it // return then to execute without transaction if IsGlobalTx(ctx) { // because each global transaction operation will use a new context, // there is no need to implement a suspend operation, just unbind the xid here. // the same is true for the following case that needs to be suspended. UnbindXid(ctx) } return nil case Supports: // if transaction is not existing, return then to execute without transaction // else beginNewGtx transaction then return if IsGlobalTx(ctx) { useExistGtx(ctx, gc) } return nil case RequiresNew: // if transaction is existing, suspend it, and then Begin beginNewGtx transaction. if IsGlobalTx(ctx) { UnbindXid(ctx) } case Required: // default case, If current transaction is existing, execute with current transaction, // else continue and execute with beginNewGtx transaction. if IsGlobalTx(ctx) { useExistGtx(ctx, gc) return nil } case Never: // if transaction is existing, throw exception. if IsGlobalTx(ctx) { return fmt.Errorf("existing transaction found for transaction marked with pg 'never', xid = %s", GetXID(ctx)) } // return then to execute without transaction. return nil case Mandatory: // if transaction is not existing, throw exception. // else execute with current transaction. if IsGlobalTx(ctx) { useExistGtx(ctx, gc) return nil } return fmt.Errorf("no existing transaction found for transaction marked with pg 'mandatory'") default: return fmt.Errorf("not supported propagation:%d", pg) } // the follow will to construct a new transaction with xid. return beginNewGtx(ctx, gc) } // commitOrRollback commit or rollback the global transaction func commitOrRollback(ctx context.Context, isSuccess bool) (re error) { switch *GetTxRole(ctx) { case Launcher: if tx := GetTx(ctx); isSuccess { if re = GetGlobalTransactionManager().Commit(ctx, tx); re != nil { log.Errorf("transactionTemplate: commit transaction failed, error %v", re) } } else { if re = GetGlobalTransactionManager().Rollback(ctx, tx); re != nil { log.Errorf("transactionTemplate: Rollback transaction failed, error %v", re) } } case Participant: // participant has no responsibility of rollback log.Infof("ignore second phase(commit or rollback): just involved in global transaction [%s/%s]", GetTxName(ctx), GetXID(ctx)) case UnKnow: re = errors.New("global transaction role is UnKnow.") } return } // beginNewGtx to construct a default global transaction func beginNewGtx(ctx context.Context, gc *GtxConfig) error { timeout := gc.Timeout if timeout == 0 { timeout = config.DefaultGlobalTransactionTimeout } SetTxRole(ctx, Launcher) SetTxName(ctx, gc.Name) SetTxStatus(ctx, message.GlobalStatusBegin) if err := GetGlobalTransactionManager().Begin(ctx, timeout); err != nil { return fmt.Errorf("transactionTemplate: Begin transaction failed, error %v", err) } return nil } // useExistGtx if xid is not empty, then construct a global transaction func useExistGtx(ctx context.Context, gc *GtxConfig) { if xid := GetXID(ctx); xid != "" { SetTx(ctx, &GlobalTransaction{ Xid: GetXID(ctx), TxStatus: message.GlobalStatusBegin, TxRole: Participant, TxName: gc.Name, }) } } // clearTxConf When using global transactions in local mode, you need to clear tx config to use the propagation of global transactions. func clearTxConf(ctx context.Context) { SetTx(ctx, &GlobalTransaction{Xid: GetXID(ctx)}) }