in service/history/engine/engineimpl/historyEngine.go [147:370]
func NewEngineWithShardContext(
shard shard.Context,
visibilityMgr persistence.VisibilityManager,
matching matching.Client,
publicClient workflowserviceclient.Interface,
historyEventNotifier events.Notifier,
config *config.Config,
crossClusterTaskFetchers task.Fetchers,
replicationTaskFetchers replication.TaskFetchers,
rawMatchingClient matching.Client,
queueTaskProcessor task.Processor,
failoverCoordinator failover.Coordinator,
wfIDCache workflowcache.WFCache,
ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter,
) engine.Engine {
currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName()
logger := shard.GetLogger()
executionManager := shard.GetExecutionManager()
historyV2Manager := shard.GetHistoryManager()
executionCache := execution.NewCache(shard)
failoverMarkerNotifier := failover.NewMarkerNotifier(shard, config, failoverCoordinator)
replicationHydrator := replication.NewDeferredTaskHydrator(shard.GetShardID(), historyV2Manager, executionCache, shard.GetDomainCache())
replicationTaskStore := replication.NewTaskStore(
shard.GetConfig(),
shard.GetClusterMetadata(),
shard.GetDomainCache(),
shard.GetMetricsClient(),
shard.GetLogger(),
replicationHydrator,
)
replicationReader := replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config)
historyEngImpl := &historyEngineImpl{
currentClusterName: currentClusterName,
shard: shard,
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
historyV2Mgr: historyV2Manager,
executionManager: executionManager,
visibilityMgr: visibilityMgr,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
executionCache: executionCache,
logger: logger.WithTags(tag.ComponentHistoryEngine),
throttledLogger: shard.GetThrottledLogger().WithTags(tag.ComponentHistoryEngine),
metricsClient: shard.GetMetricsClient(),
historyEventNotifier: historyEventNotifier,
config: config,
archivalClient: warchiver.NewClient(
shard.GetMetricsClient(),
logger,
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()),
quotas.NewDynamicRateLimiter(func() float64 {
return quotas.PerMember(
service.History,
float64(config.ArchiveInlineHistoryGlobalRPS()),
float64(config.ArchiveInlineHistoryRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
quotas.NewDynamicRateLimiter(func() float64 {
return quotas.PerMember(
service.History,
float64(config.ArchiveInlineVisibilityGlobalRPS()),
float64(config.ArchiveInlineVisibilityRPS()),
shard.GetService().GetMembershipResolver(),
)
}),
shard.GetService().GetArchiverProvider(),
config.AllowArchivingIncompleteHistory,
),
workflowResetter: reset.NewWorkflowResetter(
shard,
executionCache,
logger,
),
publicClient: publicClient,
matchingClient: matching,
rawMatchingClient: rawMatchingClient,
queueTaskProcessor: queueTaskProcessor,
clientChecker: client.NewVersionChecker(),
failoverMarkerNotifier: failoverMarkerNotifier,
replicationHydrator: replicationHydrator,
replicationAckManager: replication.NewTaskAckManager(
shard.GetShardID(),
shard,
shard.GetMetricsClient(),
shard.GetLogger(),
replicationReader,
replicationTaskStore,
),
replicationTaskStore: replicationTaskStore,
replicationMetricsEmitter: replication.NewMetricsEmitter(
shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()),
wfIDCache: wfIDCache,
ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID,
}
historyEngImpl.decisionHandler = decision.NewHandler(
shard,
historyEngImpl.executionCache,
historyEngImpl.tokenSerializer,
)
pRetry := persistence.NewPersistenceRetryer(
shard.GetExecutionManager(),
shard.GetHistoryManager(),
common.CreatePersistenceRetryPolicy(),
)
openExecutionCheck := invariant.NewConcreteExecutionExists(pRetry, shard.GetDomainCache())
historyEngImpl.txProcessor = queue.NewTransferQueueProcessor(
shard,
historyEngImpl,
queueTaskProcessor,
executionCache,
historyEngImpl.workflowResetter,
historyEngImpl.archivalClient,
openExecutionCheck,
historyEngImpl.wfIDCache,
historyEngImpl.ratelimitInternalPerWorkflowID,
)
historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor(
shard,
historyEngImpl,
queueTaskProcessor,
executionCache,
historyEngImpl.archivalClient,
openExecutionCheck,
)
historyEngImpl.crossClusterProcessor = queue.NewCrossClusterQueueProcessor(
shard,
historyEngImpl,
executionCache,
queueTaskProcessor,
)
historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger)
historyEngImpl.nDCReplicator = ndc.NewHistoryReplicator(
shard,
executionCache,
historyEngImpl.eventsReapplier,
logger,
)
historyEngImpl.nDCActivityReplicator = ndc.NewActivityReplicator(
shard,
executionCache,
logger,
)
historyEngImpl.crossClusterTaskProcessors = task.NewCrossClusterTaskProcessors(
shard,
queueTaskProcessor,
crossClusterTaskFetchers,
&task.CrossClusterTaskProcessorOptions{
Enabled: config.EnableCrossClusterEngine,
MaxPendingTasks: config.CrossClusterTargetProcessorMaxPendingTasks,
TaskMaxRetryCount: config.CrossClusterTargetProcessorMaxRetryCount,
TaskRedispatchInterval: config.ActiveTaskRedispatchInterval,
TaskWaitInterval: config.CrossClusterTargetProcessorTaskWaitInterval,
ServiceBusyBackoffInterval: config.CrossClusterTargetProcessorServiceBusyBackoffInterval,
TimerJitterCoefficient: config.CrossClusterTargetProcessorJitterCoefficient,
},
)
var replicationTaskProcessors []replication.TaskProcessor
replicationTaskExecutors := make(map[string]replication.TaskExecutor)
// Intentionally use the raw client to create its own retry policy
historyRawClient := shard.GetService().GetClientBean().GetHistoryClient()
historyRetryableClient := retryable.NewHistoryClient(
historyRawClient,
common.CreateReplicationServiceBusyRetryPolicy(),
common.IsServiceBusyError,
)
resendFunc := func(ctx context.Context, request *types.ReplicateEventsV2Request) error {
return historyRetryableClient.ReplicateEventsV2(ctx, request)
}
for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() {
sourceCluster := replicationTaskFetcher.GetSourceCluster()
// Intentionally use the raw client to create its own retry policy
adminClient := shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster)
adminRetryableClient := retryable.NewAdminClient(
adminClient,
common.CreateReplicationServiceBusyRetryPolicy(),
common.IsServiceBusyError,
)
historyResender := cndc.NewHistoryResender(
shard.GetDomainCache(),
adminRetryableClient,
resendFunc,
nil,
openExecutionCheck,
shard.GetLogger(),
)
replicationTaskExecutor := replication.NewTaskExecutor(
shard,
shard.GetDomainCache(),
historyResender,
historyEngImpl,
shard.GetMetricsClient(),
shard.GetLogger(),
)
replicationTaskExecutors[sourceCluster] = replicationTaskExecutor
replicationTaskProcessor := replication.NewTaskProcessor(
shard,
historyEngImpl,
config,
shard.GetMetricsClient(),
replicationTaskFetcher,
replicationTaskExecutor,
)
replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor)
}
historyEngImpl.replicationTaskProcessors = replicationTaskProcessors
replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutors)
historyEngImpl.replicationDLQHandler = replicationMessageHandler
shard.SetEngine(historyEngImpl)
return historyEngImpl
}