pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go (223 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package handler import ( "container/list" "context" "database/sql" "errors" "fmt" "github.com/go-sql-driver/mysql" "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/dao" "seata.apache.org/seata-go/pkg/rm/tcc/fence/store/db/model" "sync" "time" "seata.apache.org/seata-go/pkg/rm/tcc/fence/enum" "seata.apache.org/seata-go/pkg/tm" "seata.apache.org/seata-go/pkg/util/log" ) type tccFenceWrapperHandler struct { tccFenceDao dao.TCCFenceStore logQueue chan *model.FenceLogIdentity logCache list.List logQueueOnce sync.Once logQueueCloseOnce sync.Once logTaskOnce sync.Once db *sql.DB dbMutex sync.RWMutex } const ( maxQueueSize = 500 channelDelete = 5 cleanExpired = 24 * time.Hour ) var ( fenceHandler *tccFenceWrapperHandler fenceOnce sync.Once cleanInterval = 5 * time.Minute ) func GetFenceHandler() *tccFenceWrapperHandler { if fenceHandler == nil { fenceOnce.Do(func() { fenceHandler = &tccFenceWrapperHandler{ tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(), } }) } return fenceHandler } func (handler *tccFenceWrapperHandler) InitCleanPeriod(time time.Duration) { cleanInterval = time } func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx) error { xid := tm.GetBusinessActionContext(ctx).Xid branchId := tm.GetBusinessActionContext(ctx).BranchId actionName := tm.GetBusinessActionContext(ctx).ActionName err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried) if err != nil { if mysqlError, ok := errors.Unwrap(err).(*mysql.MySQLError); ok && mysqlError.Number == 1062 { log.Warnf("tcc fence record already exists, idempotency rejected. xid: %s, branchId: %d", xid, branchId) handler.pushCleanChannel(xid, branchId) } return fmt.Errorf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } return nil } func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx) error { xid := tm.GetBusinessActionContext(ctx).Xid branchId := tm.GetBusinessActionContext(ctx).BranchId fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) if err != nil { return fmt.Errorf(" commit fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } if fenceDo == nil { return fmt.Errorf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d", xid, branchId) } if fenceDo.Status == enum.StatusCommitted { log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) return nil } if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { // enable warn level log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) } return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted) } func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error { xid := tm.GetBusinessActionContext(ctx).Xid branchId := tm.GetBusinessActionContext(ctx).BranchId actionName := tm.GetBusinessActionContext(ctx).ActionName fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) if err != nil { return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } // record is null, mean the need suspend if fenceDo == nil { err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended) if err != nil { return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) return nil } // have rollbacked or suspended if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { // enable warn level log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) return nil } if fenceDo.Status == enum.StatusCommitted { log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) } return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) } func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error { tccFenceDo := model.TCCFenceDO{ Xid: xid, BranchId: branchId, ActionName: actionName, Status: status, } return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo) } func (handler *tccFenceWrapperHandler) updateFenceStatus(tx *sql.Tx, xid string, branchId int64, status enum.FenceStatus) error { return handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status) } func (handler *tccFenceWrapperHandler) InitLogCleanChannel(dsn string) { db, err := sql.Open("mysql", dsn) if err != nil { log.Warnf("failed to open database: %v", err) return } handler.dbMutex.Lock() handler.db = db handler.dbMutex.Unlock() handler.logQueueOnce.Do(func() { go handler.traversalCleanChannel(db) }) handler.logTaskOnce.Do(func() { go handler.initLogCleanTask(db) }) } func (handler *tccFenceWrapperHandler) initLogCleanTask(db *sql.DB) { ticker := time.NewTicker(cleanInterval) defer ticker.Stop() for range ticker.C { tx, err := db.Begin() if err != nil { log.Warnf("failed to begin transaction: %v", err) continue } expiredTime := time.Now().Add(-cleanExpired) identityList, err := handler.tccFenceDao.QueryTCCFenceLogIdentityByMdDate(tx, expiredTime) if err != nil { log.Warnf("failed to delete expired logs: %v", err) tx.Rollback() continue } err = tx.Commit() if err != nil { log.Errorf("failed to commit transaction: %v", err) } // push to clean channel for _, identity := range identityList { handler.logQueue <- &identity } } } func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() { handler.logQueueCloseOnce.Do(func() { close(handler.logQueue) handler.dbMutex.Lock() if handler.db != nil { handler.db.Close() handler.db = nil } handler.dbMutex.Unlock() }) } func (handler *tccFenceWrapperHandler) deleteBatchFence(tx *sql.Tx, batch []model.FenceLogIdentity) error { err := handler.tccFenceDao.DeleteMultipleTCCFenceLogIdentity(tx, batch) if err != nil { return fmt.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) } return nil } func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) { // todo implement fli := &model.FenceLogIdentity{ Xid: xid, BranchId: branchId, } select { case handler.logQueue <- fli: // todo add batch delete from log cache. default: handler.logCache.PushBack(fli) } log.Infof("add one log to clean queue: %v ", fli) } func (handler *tccFenceWrapperHandler) traversalCleanChannel(db *sql.DB) { if handler.logQueue == nil { handler.logQueue = make(chan *model.FenceLogIdentity, maxQueueSize) } counter := 0 batch := []model.FenceLogIdentity{} for li := range handler.logQueue { counter++ batch = append(batch, *li) if counter%channelDelete == 0 { tx, _ := db.Begin() err := handler.deleteBatchFence(tx, batch) if err != nil { log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) } else { tx.Commit() } counter = 0 batch = []model.FenceLogIdentity{} } } if len(batch) > 0 { tx, _ := db.Begin() err := handler.deleteBatchFence(tx, batch) if err != nil { log.Errorf("delete batch fence log failed, batch: %v, err: %v", batch, err) } else { tx.Commit() } } }