common/persistence/sql/factory.go (135 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package sql
import (
"fmt"
"sync"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
)
type (
// Factory vends store objects backed by MySQL
Factory struct {
cfg config.SQL
dbConn dbConn
clusterName string
logger log.Logger
parser serialization.Parser
dc *p.DynamicConfiguration
}
// dbConn represents a logical mysql connection - its a
// wrapper around the standard sql connection pool with
// additional reference counting
dbConn struct {
sync.Mutex
sqlplugin.DB
refCnt int
cfg *config.SQL
}
)
// NewFactory returns an instance of a factory object which can be used to create
// datastores backed by any kind of SQL store
func NewFactory(
cfg config.SQL,
clusterName string,
logger log.Logger,
parser serialization.Parser,
dc *p.DynamicConfiguration,
) *Factory {
return &Factory{
cfg: cfg,
clusterName: clusterName,
logger: logger,
dbConn: newRefCountedDBConn(&cfg),
parser: parser,
dc: dc,
}
}
// NewTaskStore returns a new task store
func (f *Factory) NewTaskStore() (p.TaskStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return newTaskPersistence(conn, f.cfg.NumShards, f.logger, f.parser)
}
// NewShardStore returns a new shard store
func (f *Factory) NewShardStore() (p.ShardStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return NewShardPersistence(conn, f.clusterName, f.logger, f.parser)
}
// NewHistoryStore returns a new history store
func (f *Factory) NewHistoryStore() (p.HistoryStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return NewHistoryV2Persistence(conn, f.logger, f.parser)
}
// NewDomainStore returns a new metadata store
func (f *Factory) NewDomainStore() (p.DomainStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return newMetadataPersistenceV2(conn, f.clusterName, f.logger, f.parser)
}
// NewExecutionStore returns an ExecutionStore for a given shardID
func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return NewSQLExecutionStore(conn, f.logger, shardID, f.parser, f.dc)
}
// NewVisibilityStore returns a visibility store
// TODO sortByCloseTime will be removed and implemented for https://github.com/uber/cadence/issues/3621
func (f *Factory) NewVisibilityStore(sortByCloseTime bool) (p.VisibilityStore, error) {
return NewSQLVisibilityStore(f.cfg, f.logger)
}
// NewQueue returns a new queue backed by sql
func (f *Factory) NewQueue(queueType p.QueueType) (p.Queue, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return newQueueStore(conn, f.logger, queueType)
}
// NewConfigStore returns a new config store backed by sql. Not Yet Implemented.
func (f *Factory) NewConfigStore() (p.ConfigStore, error) {
conn, err := f.dbConn.get()
if err != nil {
return nil, err
}
return NewSQLConfigStore(conn, f.logger, f.parser)
}
// Close closes the factory
func (f *Factory) Close() {
f.dbConn.forceClose()
}
// newRefCountedDBConn returns a logical mysql connection that
// uses reference counting to decide when to close the
// underlying connection object. The reference count gets incremented
// everytime get() is called and decremented everytime Close() is called
func newRefCountedDBConn(cfg *config.SQL) dbConn {
return dbConn{cfg: cfg}
}
// get returns a mysql db connection and increments a reference count
// this method will create a new connection, if an existing connection
// does not exist
func (c *dbConn) get() (sqlplugin.DB, error) {
c.Lock()
defer c.Unlock()
if c.refCnt == 0 {
conn, err := NewSQLDB(c.cfg)
if err != nil {
return nil, err
}
c.DB = conn
}
c.refCnt++
return c, nil
}
// forceClose ignores reference counts and shutsdown the underlying connection pool
func (c *dbConn) forceClose() {
c.Lock()
defer c.Unlock()
if c.DB != nil {
err := c.DB.Close()
if err != nil {
fmt.Println("failed to close database connection, may leak some connection", err)
}
}
c.refCnt = 0
}
// Close closes the underlying connection if the reference count becomes zero
func (c *dbConn) Close() error {
c.Lock()
defer c.Unlock()
c.refCnt--
if c.refCnt == 0 {
err := c.DB.Close()
c.DB = nil
return err
}
return nil
}