func New()

in common/resource/resourceImpl.go [143:347]


func New(
	params *Params,
	serviceName string,
	serviceConfig *service.Config,
) (impl *Impl, retError error) {

	hostname := params.HostName

	logger := params.Logger
	throttledLogger := loggerimpl.NewThrottledLogger(logger, serviceConfig.ThrottledLoggerMaxRPS)

	numShards := params.PersistenceConfig.NumHistoryShards
	dispatcher := params.RPCFactory.GetDispatcher()
	membershipResolver := params.MembershipResolver

	dynamicCollection := dynamicconfig.NewCollection(
		params.DynamicConfig,
		logger,
		dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
	)
	clientBean, err := client.NewClientBean(
		client.NewRPCClientFactory(
			params.RPCFactory,
			membershipResolver,
			params.MetricsClient,
			dynamicCollection,
			numShards,
			logger,
		),
		params.RPCFactory.GetDispatcher(),
		params.ClusterMetadata,
	)
	if err != nil {
		return nil, err
	}

	persistenceBean, err := persistenceClient.NewBeanFromFactory(persistenceClient.NewFactory(
		&params.PersistenceConfig,
		func() float64 {
			return quotas.PerMember(
				serviceName,
				float64(serviceConfig.PersistenceGlobalMaxQPS()),
				float64(serviceConfig.PersistenceMaxQPS()),
				membershipResolver,
			)
		},
		params.ClusterMetadata.GetCurrentClusterName(),
		params.MetricsClient,
		logger,
		persistence.NewDynamicConfiguration(dynamicCollection),
	), &persistenceClient.Params{
		PersistenceConfig: params.PersistenceConfig,
		MetricsClient:     params.MetricsClient,
		MessagingClient:   params.MessagingClient,
		ESClient:          params.ESClient,
		ESConfig:          params.ESConfig,
		PinotConfig:       params.PinotConfig,
		PinotClient:       params.PinotClient,
	}, serviceConfig)
	if err != nil {
		return nil, err
	}

	domainCache := cache.NewDomainCache(
		persistenceBean.GetDomainManager(),
		params.ClusterMetadata,
		params.MetricsClient,
		logger,
		cache.WithTimeSource(params.TimeSource),
	)

	domainMetricsScopeCache := cache.NewDomainMetricsScopeCache()
	domainReplicationQueue := domain.NewReplicationQueue(
		persistenceBean.GetDomainReplicationQueueManager(),
		params.ClusterMetadata.GetCurrentClusterName(),
		params.MetricsClient,
		logger,
	)

	frontendRawClient := clientBean.GetFrontendClient()
	frontendClient := retryable.NewFrontendClient(
		frontendRawClient,
		common.CreateFrontendServiceRetryPolicy(),
		serviceConfig.IsErrorRetryableFunction,
	)

	matchingRawClient, err := clientBean.GetMatchingClient(domainCache.GetDomainName)
	if err != nil {
		return nil, err
	}
	matchingClient := retryable.NewMatchingClient(
		matchingRawClient,
		common.CreateMatchingServiceRetryPolicy(),
		serviceConfig.IsErrorRetryableFunction,
	)

	historyRawClient := clientBean.GetHistoryClient()
	historyClient := retryable.NewHistoryClient(
		historyRawClient,
		common.CreateHistoryServiceRetryPolicy(),
		serviceConfig.IsErrorRetryableFunction,
	)

	historyArchiverBootstrapContainer := &archiver.HistoryBootstrapContainer{
		HistoryV2Manager: persistenceBean.GetHistoryManager(),
		Logger:           logger,
		MetricsClient:    params.MetricsClient,
		ClusterMetadata:  params.ClusterMetadata,
		DomainCache:      domainCache,
	}
	visibilityArchiverBootstrapContainer := &archiver.VisibilityBootstrapContainer{
		Logger:          logger,
		MetricsClient:   params.MetricsClient,
		ClusterMetadata: params.ClusterMetadata,
		DomainCache:     domainCache,
	}
	if err := params.ArchiverProvider.RegisterBootstrapContainer(
		serviceName,
		historyArchiverBootstrapContainer,
		visibilityArchiverBootstrapContainer,
	); err != nil {
		return nil, err
	}

	isolationGroupStore := createConfigStoreOrDefault(params, dynamicCollection)

	isolationGroupState, err := ensureIsolationGroupStateHandlerOrDefault(
		params,
		dynamicCollection,
		domainCache,
		isolationGroupStore,
	)
	if err != nil {
		return nil, err
	}
	partitioner := ensurePartitionerOrDefault(params, isolationGroupState)

	impl = &Impl{
		status: common.DaemonStatusInitialized,

		// static infos

		numShards:       numShards,
		serviceName:     params.Name,
		metricsScope:    params.MetricScope,
		clusterMetadata: params.ClusterMetadata,

		// other common resources

		domainCache:             domainCache,
		domainMetricsScopeCache: domainMetricsScopeCache,
		timeSource:              clock.NewRealTimeSource(),
		payloadSerializer:       persistence.NewPayloadSerializer(),
		metricsClient:           params.MetricsClient,
		messagingClient:         params.MessagingClient,
		blobstoreClient:         params.BlobstoreClient,
		archivalMetadata:        params.ArchivalMetadata,
		archiverProvider:        params.ArchiverProvider,
		domainReplicationQueue:  domainReplicationQueue,

		// membership infos
		membershipResolver: membershipResolver,

		// internal services clients

		sdkClient:         params.PublicClient,
		frontendRawClient: frontendRawClient,
		frontendClient:    frontendClient,
		matchingRawClient: matchingRawClient,
		matchingClient:    matchingClient,
		historyRawClient:  historyRawClient,
		historyClient:     historyClient,
		clientBean:        clientBean,

		// persistence clients
		persistenceBean: persistenceBean,

		// hostname
		hostName: hostname,

		// loggers

		logger:          logger,
		throttledLogger: throttledLogger,

		// for registering handlers
		dispatcher: dispatcher,

		// internal vars
		pprofInitializer: params.PProfInitializer,
		runtimeMetricsReporter: metrics.NewRuntimeMetricsReporter(
			params.MetricScope,
			time.Minute,
			logger,
			params.InstanceID,
		),
		rpcFactory:                params.RPCFactory,
		isolationGroups:           isolationGroupState,
		isolationGroupConfigStore: isolationGroupStore, // can be nil where persistence is not available
		partitioner:               partitioner,

		asyncWorkflowQueueProvider: params.AsyncWorkflowQueueProvider,
	}
	return impl, nil
}