in common/resource/resourceImpl.go [143:347]
func New(
params *Params,
serviceName string,
serviceConfig *service.Config,
) (impl *Impl, retError error) {
hostname := params.HostName
logger := params.Logger
throttledLogger := loggerimpl.NewThrottledLogger(logger, serviceConfig.ThrottledLoggerMaxRPS)
numShards := params.PersistenceConfig.NumHistoryShards
dispatcher := params.RPCFactory.GetDispatcher()
membershipResolver := params.MembershipResolver
dynamicCollection := dynamicconfig.NewCollection(
params.DynamicConfig,
logger,
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
)
clientBean, err := client.NewClientBean(
client.NewRPCClientFactory(
params.RPCFactory,
membershipResolver,
params.MetricsClient,
dynamicCollection,
numShards,
logger,
),
params.RPCFactory.GetDispatcher(),
params.ClusterMetadata,
)
if err != nil {
return nil, err
}
persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory(
¶ms.PersistenceConfig,
func() float64 {
return quotas.PerMember(
serviceName,
float64(serviceConfig.PersistenceGlobalMaxQPS()),
float64(serviceConfig.PersistenceMaxQPS()),
membershipResolver,
)
},
params.ClusterMetadata.GetCurrentClusterName(),
params.MetricsClient,
logger,
persistence.NewDynamicConfiguration(dynamicCollection),
), &persistenceClient.Params{
PersistenceConfig: params.PersistenceConfig,
MetricsClient: params.MetricsClient,
MessagingClient: params.MessagingClient,
ESClient: params.ESClient,
ESConfig: params.ESConfig,
PinotConfig: params.PinotConfig,
PinotClient: params.PinotClient,
}, serviceConfig)
if err != nil {
return nil, err
}
domainCache := cache.NewDomainCache(
persistenceBean.GetDomainManager(),
params.ClusterMetadata,
params.MetricsClient,
logger,
cache.WithTimeSource(params.TimeSource),
)
domainMetricsScopeCache := cache.NewDomainMetricsScopeCache()
domainReplicationQueue := domain.NewReplicationQueue(
persistenceBean.GetDomainReplicationQueueManager(),
params.ClusterMetadata.GetCurrentClusterName(),
params.MetricsClient,
logger,
)
frontendRawClient := clientBean.GetFrontendClient()
frontendClient := retryable.NewFrontendClient(
frontendRawClient,
common.CreateFrontendServiceRetryPolicy(),
serviceConfig.IsErrorRetryableFunction,
)
matchingRawClient, err := clientBean.GetMatchingClient(domainCache.GetDomainName)
if err != nil {
return nil, err
}
matchingClient := retryable.NewMatchingClient(
matchingRawClient,
common.CreateMatchingServiceRetryPolicy(),
serviceConfig.IsErrorRetryableFunction,
)
historyRawClient := clientBean.GetHistoryClient()
historyClient := retryable.NewHistoryClient(
historyRawClient,
common.CreateHistoryServiceRetryPolicy(),
serviceConfig.IsErrorRetryableFunction,
)
historyArchiverBootstrapContainer := &archiver.HistoryBootstrapContainer{
HistoryV2Manager: persistenceBean.GetHistoryManager(),
Logger: logger,
MetricsClient: params.MetricsClient,
ClusterMetadata: params.ClusterMetadata,
DomainCache: domainCache,
}
visibilityArchiverBootstrapContainer := &archiver.VisibilityBootstrapContainer{
Logger: logger,
MetricsClient: params.MetricsClient,
ClusterMetadata: params.ClusterMetadata,
DomainCache: domainCache,
}
if err := params.ArchiverProvider.RegisterBootstrapContainer(
serviceName,
historyArchiverBootstrapContainer,
visibilityArchiverBootstrapContainer,
); err != nil {
return nil, err
}
isolationGroupStore := createConfigStoreOrDefault(params, dynamicCollection)
isolationGroupState, err := ensureIsolationGroupStateHandlerOrDefault(
params,
dynamicCollection,
domainCache,
isolationGroupStore,
)
if err != nil {
return nil, err
}
partitioner := ensurePartitionerOrDefault(params, isolationGroupState)
impl = &Impl{
status: common.DaemonStatusInitialized,
// static infos
numShards: numShards,
serviceName: params.Name,
metricsScope: params.MetricScope,
clusterMetadata: params.ClusterMetadata,
// other common resources
domainCache: domainCache,
domainMetricsScopeCache: domainMetricsScopeCache,
timeSource: clock.NewRealTimeSource(),
payloadSerializer: persistence.NewPayloadSerializer(),
metricsClient: params.MetricsClient,
messagingClient: params.MessagingClient,
blobstoreClient: params.BlobstoreClient,
archivalMetadata: params.ArchivalMetadata,
archiverProvider: params.ArchiverProvider,
domainReplicationQueue: domainReplicationQueue,
// membership infos
membershipResolver: membershipResolver,
// internal services clients
sdkClient: params.PublicClient,
frontendRawClient: frontendRawClient,
frontendClient: frontendClient,
matchingRawClient: matchingRawClient,
matchingClient: matchingClient,
historyRawClient: historyRawClient,
historyClient: historyClient,
clientBean: clientBean,
// persistence clients
persistenceBean: persistenceBean,
// hostname
hostName: hostname,
// loggers
logger: logger,
throttledLogger: throttledLogger,
// for registering handlers
dispatcher: dispatcher,
// internal vars
pprofInitializer: params.PProfInitializer,
runtimeMetricsReporter: metrics.NewRuntimeMetricsReporter(
params.MetricScope,
time.Minute,
logger,
params.InstanceID,
),
rpcFactory: params.RPCFactory,
isolationGroups: isolationGroupState,
isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
partitioner: partitioner,
asyncWorkflowQueueProvider: params.AsyncWorkflowQueueProvider,
}
return impl, nil
}