func NewEngineWithShardContext()

in service/history/engine/engineimpl/historyEngine.go [147:370]


func NewEngineWithShardContext(
	shard shard.Context,
	visibilityMgr persistence.VisibilityManager,
	matching matching.Client,
	publicClient workflowserviceclient.Interface,
	historyEventNotifier events.Notifier,
	config *config.Config,
	crossClusterTaskFetchers task.Fetchers,
	replicationTaskFetchers replication.TaskFetchers,
	rawMatchingClient matching.Client,
	queueTaskProcessor task.Processor,
	failoverCoordinator failover.Coordinator,
	wfIDCache workflowcache.WFCache,
	ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
) engine.Engine {
	currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()

	logger := shard.GetLogger()
	executionManager := shard.GetExecutionManager()
	historyV2Manager := shard.GetHistoryManager()
	executionCache := execution.NewCache(shard)
	failoverMarkerNotifier := failover.NewMarkerNotifier(shard, config, failoverCoordinator)
	replicationHydrator := replication.NewDeferredTaskHydrator(shard.GetShardID(), historyV2Manager, executionCache, shard.GetDomainCache())
	replicationTaskStore := replication.NewTaskStore(
		shard.GetConfig(),
		shard.GetClusterMetadata(),
		shard.GetDomainCache(),
		shard.GetMetricsClient(),
		shard.GetLogger(),
		replicationHydrator,
	)
	replicationReader := replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config)

	historyEngImpl := &historyEngineImpl{
		currentClusterName:   currentClusterName,
		shard:                shard,
		clusterMetadata:      shard.GetClusterMetadata(),
		timeSource:           shard.GetTimeSource(),
		historyV2Mgr:         historyV2Manager,
		executionManager:     executionManager,
		visibilityMgr:        visibilityMgr,
		tokenSerializer:      common.NewJSONTaskTokenSerializer(),
		executionCache:       executionCache,
		logger:               logger.WithTags(tag.ComponentHistoryEngine),
		throttledLogger:      shard.GetThrottledLogger().WithTags(tag.ComponentHistoryEngine),
		metricsClient:        shard.GetMetricsClient(),
		historyEventNotifier: historyEventNotifier,
		config:               config,
		archivalClient: warchiver.NewClient(
			shard.GetMetricsClient(),
			logger,
			publicClient,
			shard.GetConfig().NumArchiveSystemWorkflows,
			quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()),
			quotas.NewDynamicRateLimiter(func() float64 {
				return quotas.PerMember(
					service.History,
					float64(config.ArchiveInlineHistoryGlobalRPS()),
					float64(config.ArchiveInlineHistoryRPS()),
					shard.GetService().GetMembershipResolver(),
				)
			}),
			quotas.NewDynamicRateLimiter(func() float64 {
				return quotas.PerMember(
					service.History,
					float64(config.ArchiveInlineVisibilityGlobalRPS()),
					float64(config.ArchiveInlineVisibilityRPS()),
					shard.GetService().GetMembershipResolver(),
				)
			}),
			shard.GetService().GetArchiverProvider(),
			config.AllowArchivingIncompleteHistory,
		),
		workflowResetter: reset.NewWorkflowResetter(
			shard,
			executionCache,
			logger,
		),
		publicClient:           publicClient,
		matchingClient:         matching,
		rawMatchingClient:      rawMatchingClient,
		queueTaskProcessor:     queueTaskProcessor,
		clientChecker:          client.NewVersionChecker(),
		failoverMarkerNotifier: failoverMarkerNotifier,
		replicationHydrator:    replicationHydrator,
		replicationAckManager: replication.NewTaskAckManager(
			shard.GetShardID(),
			shard,
			shard.GetMetricsClient(),
			shard.GetLogger(),
			replicationReader,
			replicationTaskStore,
		),
		replicationTaskStore: replicationTaskStore,
		replicationMetricsEmitter: replication.NewMetricsEmitter(
			shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
		wfIDCache:                      wfIDCache,
		ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID,
	}
	historyEngImpl.decisionHandler = decision.NewHandler(
		shard,
		historyEngImpl.executionCache,
		historyEngImpl.tokenSerializer,
	)
	pRetry := persistence.NewPersistenceRetryer(
		shard.GetExecutionManager(),
		shard.GetHistoryManager(),
		common.CreatePersistenceRetryPolicy(),
	)
	openExecutionCheck := invariant.NewConcreteExecutionExists(pRetry, shard.GetDomainCache())

	historyEngImpl.txProcessor = queue.NewTransferQueueProcessor(
		shard,
		historyEngImpl,
		queueTaskProcessor,
		executionCache,
		historyEngImpl.workflowResetter,
		historyEngImpl.archivalClient,
		openExecutionCheck,
		historyEngImpl.wfIDCache,
		historyEngImpl.ratelimitInternalPerWorkflowID,
	)

	historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor(
		shard,
		historyEngImpl,
		queueTaskProcessor,
		executionCache,
		historyEngImpl.archivalClient,
		openExecutionCheck,
	)

	historyEngImpl.crossClusterProcessor = queue.NewCrossClusterQueueProcessor(
		shard,
		historyEngImpl,
		executionCache,
		queueTaskProcessor,
	)

	historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger)

	historyEngImpl.nDCReplicator = ndc.NewHistoryReplicator(
		shard,
		executionCache,
		historyEngImpl.eventsReapplier,
		logger,
	)
	historyEngImpl.nDCActivityReplicator = ndc.NewActivityReplicator(
		shard,
		executionCache,
		logger,
	)

	historyEngImpl.crossClusterTaskProcessors = task.NewCrossClusterTaskProcessors(
		shard,
		queueTaskProcessor,
		crossClusterTaskFetchers,
		&task.CrossClusterTaskProcessorOptions{
			Enabled:                    config.EnableCrossClusterEngine,
			MaxPendingTasks:            config.CrossClusterTargetProcessorMaxPendingTasks,
			TaskMaxRetryCount:          config.CrossClusterTargetProcessorMaxRetryCount,
			TaskRedispatchInterval:     config.ActiveTaskRedispatchInterval,
			TaskWaitInterval:           config.CrossClusterTargetProcessorTaskWaitInterval,
			ServiceBusyBackoffInterval: config.CrossClusterTargetProcessorServiceBusyBackoffInterval,
			TimerJitterCoefficient:     config.CrossClusterTargetProcessorJitterCoefficient,
		},
	)

	var replicationTaskProcessors []replication.TaskProcessor
	replicationTaskExecutors := make(map[string]replication.TaskExecutor)
	// Intentionally use the raw client to create its own retry policy
	historyRawClient := shard.GetService().GetClientBean().GetHistoryClient()
	historyRetryableClient := retryable.NewHistoryClient(
		historyRawClient,
		common.CreateReplicationServiceBusyRetryPolicy(),
		common.IsServiceBusyError,
	)
	resendFunc := func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
		return historyRetryableClient.ReplicateEventsV2(ctx, request)
	}
	for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() {
		sourceCluster := replicationTaskFetcher.GetSourceCluster()
		// Intentionally use the raw client to create its own retry policy
		adminClient := shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster)
		adminRetryableClient := retryable.NewAdminClient(
			adminClient,
			common.CreateReplicationServiceBusyRetryPolicy(),
			common.IsServiceBusyError,
		)
		historyResender := cndc.NewHistoryResender(
			shard.GetDomainCache(),
			adminRetryableClient,
			resendFunc,
			nil,
			openExecutionCheck,
			shard.GetLogger(),
		)
		replicationTaskExecutor := replication.NewTaskExecutor(
			shard,
			shard.GetDomainCache(),
			historyResender,
			historyEngImpl,
			shard.GetMetricsClient(),
			shard.GetLogger(),
		)
		replicationTaskExecutors[sourceCluster] = replicationTaskExecutor

		replicationTaskProcessor := replication.NewTaskProcessor(
			shard,
			historyEngImpl,
			config,
			shard.GetMetricsClient(),
			replicationTaskFetcher,
			replicationTaskExecutor,
		)
		replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor)
	}
	historyEngImpl.replicationTaskProcessors = replicationTaskProcessors
	replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutors)
	historyEngImpl.replicationDLQHandler = replicationMessageHandler

	shard.SetEngine(historyEngImpl)
	return historyEngImpl
}