common/persistence/client/factory.go (450 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package client
import (
"sync"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/config"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/elasticsearch"
"github.com/uber/cadence/common/persistence/nosql"
pinotVisibility "github.com/uber/cadence/common/persistence/pinot"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/persistence/wrappers/errorinjectors"
"github.com/uber/cadence/common/persistence/wrappers/metered"
"github.com/uber/cadence/common/persistence/wrappers/ratelimited"
"github.com/uber/cadence/common/persistence/wrappers/sampled"
pnt "github.com/uber/cadence/common/pinot"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service"
)
type (
// Factory defines the interface for any implementation that can vend
// persistence layer objects backed by a datastore. The actual datastore
// is implementation detail hidden behind this interface
Factory interface {
// Close the factory
Close()
// NewTaskManager returns a new task manager
NewTaskManager() (p.TaskManager, error)
// NewShardManager returns a new shard manager
NewShardManager() (p.ShardManager, error)
// NewHistoryManager returns a new history manager
NewHistoryManager() (p.HistoryManager, error)
// NewDomainManager returns a new metadata manager
NewDomainManager() (p.DomainManager, error)
// NewExecutionManager returns a new execution manager for a given shardID
NewExecutionManager(shardID int) (p.ExecutionManager, error)
// NewVisibilityManager returns a new visibility manager
NewVisibilityManager(params *Params, serviceConfig *service.Config) (p.VisibilityManager, error)
// NewDomainReplicationQueueManager returns a new queue for domain replication
NewDomainReplicationQueueManager() (p.QueueManager, error)
// NewConfigStoreManager returns a new config store manager
NewConfigStoreManager() (p.ConfigStoreManager, error)
}
// DataStoreFactory is a low level interface to be implemented by a datastore
// Examples of datastores are cassandra, mysql etc
DataStoreFactory interface {
// Close closes the factory
Close()
// NewTaskStore returns a new task store
NewTaskStore() (p.TaskStore, error)
// NewShardStore returns a new shard store
NewShardStore() (p.ShardStore, error)
// NewHistoryStore returns a new history store
NewHistoryStore() (p.HistoryStore, error)
// NewDomainStore returns a new metadata store
NewDomainStore() (p.DomainStore, error)
// NewExecutionStore returns an execution store for given shardID
NewExecutionStore(shardID int) (p.ExecutionStore, error)
// NewVisibilityStore returns a new visibility store,
// TODO We temporarily using sortByCloseTime to determine whether or not ListClosedWorkflowExecutions should
// be ordering by CloseTime. This will be removed when implementing https://github.com/uber/cadence/issues/3621
NewVisibilityStore(sortByCloseTime bool) (p.VisibilityStore, error)
NewQueue(queueType p.QueueType) (p.Queue, error)
// NewConfigStore returns a new config store
NewConfigStore() (p.ConfigStore, error)
}
// Datastore represents a datastore
Datastore struct {
factory DataStoreFactory
ratelimit quotas.Limiter
}
factoryImpl struct {
sync.RWMutex
config *config.Persistence
metricsClient metrics.Client
logger log.Logger
datastores map[storeType]Datastore
clusterName string
dc *p.DynamicConfiguration
}
storeType int
)
const (
storeTypeHistory storeType = iota + 1
storeTypeTask
storeTypeShard
storeTypeMetadata
storeTypeExecution
storeTypeVisibility
storeTypeQueue
storeTypeConfigStore
)
var storeTypes = []storeType{
storeTypeHistory,
storeTypeTask,
storeTypeShard,
storeTypeMetadata,
storeTypeExecution,
storeTypeVisibility,
storeTypeQueue,
storeTypeConfigStore,
}
// NewFactory returns an implementation of factory that vends persistence objects based on
// specified configuration. This factory takes as input a config.Persistence object
// which specifies the datastore to be used for a given type of object. This config
// also contains config for individual datastores themselves.
//
// The objects returned by this factory enforce ratelimit and maxconns according to
// given configuration. In addition, all objects will emit metrics automatically
func NewFactory(
cfg *config.Persistence,
persistenceMaxQPS quotas.RPSFunc,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
dc *p.DynamicConfiguration,
) Factory {
factory := &factoryImpl{
config: cfg,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
dc: dc,
}
limiters := buildRatelimiters(cfg, persistenceMaxQPS)
factory.init(clusterName, limiters)
return factory
}
// NewTaskManager returns a new task manager
func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
ds := f.datastores[storeTypeTask]
store, err := ds.factory.NewTaskStore()
if err != nil {
return nil, err
}
result := p.NewTaskManager(store)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewTaskManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewTaskManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewTaskManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
// NewShardManager returns a new shard manager
func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
ds := f.datastores[storeTypeShard]
store, err := ds.factory.NewShardStore()
if err != nil {
return nil, err
}
result := p.NewShardManager(store)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewShardManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewShardManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewShardManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
// NewHistoryManager returns a new history manager
func (f *factoryImpl) NewHistoryManager() (p.HistoryManager, error) {
ds := f.datastores[storeTypeHistory]
store, err := ds.factory.NewHistoryStore()
if err != nil {
return nil, err
}
result := p.NewHistoryV2ManagerImpl(store, f.logger, f.config.TransactionSizeLimit)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewHistoryManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewHistoryManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewHistoryManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
// NewDomainManager returns a new metadata manager
func (f *factoryImpl) NewDomainManager() (p.DomainManager, error) {
var err error
var store p.DomainStore
ds := f.datastores[storeTypeMetadata]
store, err = ds.factory.NewDomainStore()
if err != nil {
return nil, err
}
result := p.NewDomainManagerImpl(store, f.logger)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewDomainManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewDomainManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewDomainManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
// NewExecutionManager returns a new execution manager for a given shardID
func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, error) {
ds := f.datastores[storeTypeExecution]
store, err := ds.factory.NewExecutionStore(shardID)
if err != nil {
return nil, err
}
result := p.NewExecutionManagerImpl(store, f.logger, p.NewPayloadSerializer())
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewExecutionManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewExecutionManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewExecutionManager(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate, f.dc.EnableShardIDMetrics)
}
return result, nil
}
// NewVisibilityManager returns a new visibility manager
func (f *factoryImpl) NewVisibilityManager(
params *Params,
resourceConfig *service.Config,
) (p.VisibilityManager, error) {
if resourceConfig.EnableReadVisibilityFromES == nil && resourceConfig.AdvancedVisibilityWritingMode == nil {
// No need to create visibility manager as no read/write needed
return nil, nil
}
var visibilityFromDB, visibilityFromES, visibilityFromPinot p.VisibilityManager
var err error
if params.PersistenceConfig.VisibilityStore != "" {
visibilityFromDB, err = f.newDBVisibilityManager(resourceConfig)
if err != nil {
return nil, err
}
}
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
visibilityProducer, err := params.MessagingClient.NewProducer(common.PinotVisibilityAppName)
if err != nil {
f.logger.Fatal("Creating visibility producer failed", tag.Error(err))
}
visibilityFromPinot = newPinotVisibilityManager(
params.PinotClient, resourceConfig, visibilityProducer, params.MetricsClient, f.logger)
esVisibilityProducer, err := params.MessagingClient.NewProducer(common.VisibilityAppName)
visibilityIndexName := params.ESConfig.Indices[common.VisibilityAppName]
visibilityFromES = newESVisibilityManager(
visibilityIndexName, params.ESClient, resourceConfig, esVisibilityProducer, params.MetricsClient, f.logger,
)
return p.NewPinotVisibilityTripleManager(
visibilityFromDB,
visibilityFromPinot,
visibilityFromES,
resourceConfig.EnableReadVisibilityFromPinot,
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
resourceConfig.EnableLogCustomerQueryParameter,
f.logger,
), nil
} else if params.PersistenceConfig.AdvancedVisibilityStore != "" {
visibilityIndexName := params.ESConfig.Indices[common.VisibilityAppName]
visibilityProducer, err := params.MessagingClient.NewProducer(common.VisibilityAppName)
if err != nil {
f.logger.Fatal("Creating visibility producer failed", tag.Error(err))
}
visibilityFromES = newESVisibilityManager(
visibilityIndexName, params.ESClient, resourceConfig, visibilityProducer, params.MetricsClient, f.logger,
)
}
return p.NewVisibilityDualManager(
visibilityFromDB,
visibilityFromES,
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
f.logger,
), nil
}
// NewESVisibilityManager create a visibility manager for ElasticSearch
// In history, it only needs kafka producer for writing data;
// In frontend, it only needs ES client and related config for reading data
func newPinotVisibilityManager(
pinotClient pnt.GenericClient,
visibilityConfig *service.Config,
producer messaging.Producer,
metricsClient metrics.Client,
log log.Logger,
) p.VisibilityManager {
visibilityFromPinotStore := pinotVisibility.NewPinotVisibilityStore(pinotClient, visibilityConfig, producer, log)
visibilityFromPinot := p.NewVisibilityManagerImpl(visibilityFromPinotStore, log)
// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
pinotRateLimiter := quotas.NewDynamicRateLimiter(visibilityConfig.PersistenceMaxQPS.AsFloat64())
visibilityFromPinot = ratelimited.NewVisibilityManager(visibilityFromPinot, pinotRateLimiter)
}
if metricsClient != nil {
// wrap with metrics
visibilityFromPinot = pinotVisibility.NewPinotVisibilityMetricsClient(visibilityFromPinot, metricsClient, log)
}
return visibilityFromPinot
}
// NewESVisibilityManager create a visibility manager for ElasticSearch
// In history, it only needs kafka producer for writing data;
// In frontend, it only needs ES client and related config for reading data
func newESVisibilityManager(
indexName string,
esClient es.GenericClient,
visibilityConfig *service.Config,
producer messaging.Producer,
metricsClient metrics.Client,
log log.Logger,
) p.VisibilityManager {
visibilityFromESStore := elasticsearch.NewElasticSearchVisibilityStore(esClient, indexName, producer, visibilityConfig, log)
visibilityFromES := p.NewVisibilityManagerImpl(visibilityFromESStore, log)
// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
esRateLimiter := quotas.NewDynamicRateLimiter(visibilityConfig.PersistenceMaxQPS.AsFloat64())
visibilityFromES = ratelimited.NewVisibilityManager(visibilityFromES, esRateLimiter)
}
if metricsClient != nil {
// wrap with metrics
visibilityFromES = elasticsearch.NewVisibilityMetricsClient(visibilityFromES, metricsClient, log)
}
return visibilityFromES
}
func (f *factoryImpl) newDBVisibilityManager(
visibilityConfig *service.Config,
) (p.VisibilityManager, error) {
enableReadFromClosedExecutionV2 := false
if visibilityConfig.EnableReadDBVisibilityFromClosedExecutionV2 != nil {
enableReadFromClosedExecutionV2 = visibilityConfig.EnableReadDBVisibilityFromClosedExecutionV2()
} else {
f.logger.Warn("missing visibility and EnableReadFromClosedExecutionV2 config", tag.Value(visibilityConfig))
}
ds := f.datastores[storeTypeVisibility]
store, err := ds.factory.NewVisibilityStore(enableReadFromClosedExecutionV2)
if err != nil {
return nil, err
}
result := p.NewVisibilityManagerImpl(store, f.logger)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewVisibilityManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewVisibilityManager(result, ds.ratelimit)
}
if visibilityConfig.EnableDBVisibilitySampling != nil && visibilityConfig.EnableDBVisibilitySampling() {
result = sampled.NewVisibilityManager(result, sampled.Params{
Config: &sampled.Config{
VisibilityClosedMaxQPS: visibilityConfig.WriteDBVisibilityClosedMaxQPS,
VisibilityListMaxQPS: visibilityConfig.DBVisibilityListMaxQPS,
VisibilityOpenMaxQPS: visibilityConfig.WriteDBVisibilityOpenMaxQPS,
},
MetricClient: f.metricsClient,
Logger: f.logger,
TimeSource: clock.NewRealTimeSource(),
RateLimiterFactoryFunc: sampled.NewDomainToBucketMap,
})
}
if f.metricsClient != nil {
result = metered.NewVisibilityManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
func (f *factoryImpl) NewDomainReplicationQueueManager() (p.QueueManager, error) {
ds := f.datastores[storeTypeQueue]
store, err := ds.factory.NewQueue(p.DomainReplicationQueueType)
if err != nil {
return nil, err
}
result := p.NewQueueManager(store)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewQueueManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewQueueManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewQueueManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
func (f *factoryImpl) NewConfigStoreManager() (p.ConfigStoreManager, error) {
ds := f.datastores[storeTypeConfigStore]
store, err := ds.factory.NewConfigStore()
if err != nil {
return nil, err
}
result := p.NewConfigStoreManagerImpl(store, f.logger)
if errorRate := f.config.ErrorInjectionRate(); errorRate != 0 {
result = errorinjectors.NewConfigStoreManager(result, errorRate, f.logger)
}
if ds.ratelimit != nil {
result = ratelimited.NewConfigStoreManager(result, ds.ratelimit)
}
if f.metricsClient != nil {
result = metered.NewConfigStoreManager(result, f.metricsClient, f.logger, f.config)
}
return result, nil
}
// Close closes this factory
func (f *factoryImpl) Close() {
ds := f.datastores[storeTypeExecution]
ds.factory.Close()
}
func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limiter) {
f.datastores = make(map[storeType]Datastore, len(storeTypes))
defaultCfg := f.config.DataStores[f.config.DefaultStore]
if defaultCfg.Cassandra != nil {
f.logger.Warn("Cassandra config is deprecated, please use NoSQL with pluginName of cassandra.")
}
defaultDataStore := Datastore{ratelimit: limiters[f.config.DefaultStore]}
switch {
case defaultCfg.NoSQL != nil:
shardedNoSQLConfig := defaultCfg.NoSQL.ConvertToShardedNoSQLConfig()
defaultDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.dc)
case defaultCfg.ShardedNoSQL != nil:
defaultDataStore.factory = nosql.NewFactory(*defaultCfg.ShardedNoSQL, clusterName, f.logger, f.dc)
case defaultCfg.SQL != nil:
if defaultCfg.SQL.EncodingType == "" {
defaultCfg.SQL.EncodingType = string(common.EncodingTypeThriftRW)
}
if len(defaultCfg.SQL.DecodingTypes) == 0 {
defaultCfg.SQL.DecodingTypes = []string{
string(common.EncodingTypeThriftRW),
}
}
var decodingTypes []common.EncodingType
for _, dt := range defaultCfg.SQL.DecodingTypes {
decodingTypes = append(decodingTypes, common.EncodingType(dt))
}
defaultDataStore.factory = sql.NewFactory(
*defaultCfg.SQL,
clusterName,
f.logger,
getSQLParser(f.logger, common.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...),
f.dc)
default:
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for defaultDataStore")
}
for _, st := range storeTypes {
if st != storeTypeVisibility {
f.datastores[st] = defaultDataStore
}
}
visibilityCfg, ok := f.config.DataStores[f.config.VisibilityStore]
if !ok {
f.logger.Info("no visibilityStore is configured, will use advancedVisibilityStore")
// NOTE: f.datastores[storeTypeVisibility] will be nil
return
}
if visibilityCfg.Cassandra != nil {
f.logger.Warn("Cassandra config is deprecated, please use NoSQL with pluginName of cassandra.")
}
visibilityDataStore := Datastore{ratelimit: limiters[f.config.VisibilityStore]}
switch {
case visibilityCfg.NoSQL != nil:
shardedNoSQLConfig := visibilityCfg.NoSQL.ConvertToShardedNoSQLConfig()
visibilityDataStore.factory = nosql.NewFactory(*shardedNoSQLConfig, clusterName, f.logger, f.dc)
case visibilityCfg.SQL != nil:
var decodingTypes []common.EncodingType
for _, dt := range visibilityCfg.SQL.DecodingTypes {
decodingTypes = append(decodingTypes, common.EncodingType(dt))
}
visibilityDataStore.factory = sql.NewFactory(
*visibilityCfg.SQL,
clusterName,
f.logger,
getSQLParser(f.logger, common.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...),
f.dc)
default:
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for visibilityStore")
}
f.datastores[storeTypeVisibility] = visibilityDataStore
}
func getSQLParser(logger log.Logger, encodingType common.EncodingType, decodingTypes ...common.EncodingType) serialization.Parser {
parser, err := serialization.NewParser(encodingType, decodingTypes...)
if err != nil {
logger.Fatal("failed to construct sql parser", tag.Error(err))
}
return parser
}
func buildRatelimiters(cfg *config.Persistence, maxQPS quotas.RPSFunc) map[string]quotas.Limiter {
result := make(map[string]quotas.Limiter, len(cfg.DataStores))
for dsName := range cfg.DataStores {
if maxQPS != nil && maxQPS() > 0 {
result[dsName] = quotas.NewDynamicRateLimiter(maxQPS)
}
}
return result
}