pkg/datasource/sql/at_resource_manager.go (84 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sql import ( "context" "database/sql" "fmt" "sync" "github.com/prometheus/client_golang/prometheus" "seata.apache.org/seata-go/pkg/datasource/sql/datasource" "seata.apache.org/seata-go/pkg/datasource/sql/types" "seata.apache.org/seata-go/pkg/datasource/sql/undo" "seata.apache.org/seata-go/pkg/protocol/branch" "seata.apache.org/seata-go/pkg/rm" serr "seata.apache.org/seata-go/pkg/util/errors" ) func InitAT(cfg undo.Config, asyncCfg AsyncWorkerConfig) { atSourceManager := &ATSourceManager{ resourceCache: sync.Map{}, basic: datasource.NewBasicSourceManager(), rmRemoting: rm.GetRMRemotingInstance(), } undo.InitUndoConfig(cfg) atSourceManager.worker = NewAsyncWorker(prometheus.DefaultRegisterer, asyncCfg, atSourceManager) rm.GetRmCacheInstance().RegisterResourceManager(atSourceManager) } type ATSourceManager struct { resourceCache sync.Map worker *AsyncWorker basic *datasource.BasicSourceManager rmRemoting *rm.RMRemoting } func (a *ATSourceManager) GetBranchType() branch.BranchType { return branch.BranchTypeAT } // GetCachedResources get all resources managed by this manager func (a *ATSourceManager) GetCachedResources() *sync.Map { return &a.resourceCache } // RegisterResource register a Resource to be managed by Resource Manager func (a *ATSourceManager) RegisterResource(res rm.Resource) error { a.resourceCache.Store(res.GetResourceId(), res) return a.basic.RegisterResource(res) } // UnregisterResource unregister a Resource from the Resource Manager func (a *ATSourceManager) UnregisterResource(res rm.Resource) error { return a.basic.UnregisterResource(res) } // BranchRollback rollback a branch transaction func (a *ATSourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) { var dbResource *DBResource if resource, ok := a.resourceCache.Load(branchResource.ResourceId); !ok { err := fmt.Errorf("DB resource is not exist, resourceId: %s", branchResource.ResourceId) return branch.BranchStatusUnknown, err } else { dbResource, _ = resource.(*DBResource) } undoMgr, err := undo.GetUndoLogManager(dbResource.dbType) if err != nil { return branch.BranchStatusUnknown, err } if err := undoMgr.RunUndo(ctx, branchResource.Xid, branchResource.BranchId, dbResource.db, dbResource.dbName); err != nil { transErr, ok := err.(*serr.SeataError) if !ok { return branch.BranchStatusPhaseoneFailed, err } if transErr.Code == serr.TransactionErrorCodeBranchRollbackFailedUnretriable { return branch.BranchStatusPhasetwoRollbackFailedUnretryable, nil } return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil } return branch.BranchStatusPhasetwoRollbacked, nil } // BranchCommit commit the branch transaction func (a *ATSourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) { a.worker.BranchCommit(ctx, resource) return branch.BranchStatusPhasetwoCommitted, nil } func (a *ATSourceManager) LockQuery(ctx context.Context, param rm.LockQueryParam) (bool, error) { return a.rmRemoting.LockQuery(param) } // BranchRegister branch transaction register func (a *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegisterParam) (int64, error) { return a.rmRemoting.BranchRegister(req) } // BranchReport Report status of transaction branch func (a *ATSourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error { return a.rmRemoting.BranchReport(param) } func (a *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType, db *sql.DB) (datasource.TableMetaCache, error) { return a.basic.CreateTableMetaCache(ctx, resID, dbType, db) }