host/onebox.go (822 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package host import ( "context" "encoding/json" "fmt" "sync" "time" "github.com/pborman/uuid" "github.com/uber-go/tally" apiv1 "github.com/uber/cadence-idl/go/proto/api/v1" cwsc "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/cadence/compatibility" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" "go.uber.org/yarpc/transport/grpc" "go.uber.org/yarpc/transport/tchannel" adminClient "github.com/uber/cadence/client/admin" frontendClient "github.com/uber/cadence/client/frontend" historyClient "github.com/uber/cadence/client/history" "github.com/uber/cadence/common" carchiver "github.com/uber/cadence/common/archiver" "github.com/uber/cadence/common/archiver/provider" "github.com/uber/cadence/common/asyncworkflow/queue" "github.com/uber/cadence/common/authorization" "github.com/uber/cadence/common/cache" cc "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/wrappers/metered" "github.com/uber/cadence/common/pinot" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/rpc" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/frontend" "github.com/uber/cadence/service/history" "github.com/uber/cadence/service/matching" "github.com/uber/cadence/service/worker" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/asyncworkflow" "github.com/uber/cadence/service/worker/indexer" "github.com/uber/cadence/service/worker/replicator" ) // Cadence hosts all of cadence services in one process type Cadence interface { Start() error Stop() GetAdminClient() adminClient.Client GetFrontendClient() frontendClient.Client FrontendHost() membership.HostInfo GetHistoryClient() historyClient.Client GetExecutionManagerFactory() persistence.ExecutionManagerFactory } type ( cadenceImpl struct { frontendService common.Daemon matchingService common.Daemon historyServices []common.Daemon adminClient adminClient.Client frontendClient frontendClient.Client historyClient historyClient.Client logger log.Logger clusterMetadata cluster.Metadata persistenceConfig config.Persistence messagingClient messaging.Client domainManager persistence.DomainManager historyV2Mgr persistence.HistoryManager executionMgrFactory persistence.ExecutionManagerFactory domainReplicationQueue domain.ReplicationQueue shutdownCh chan struct{} shutdownWG sync.WaitGroup clusterNo int // cluster number replicator *replicator.Replicator clientWorker archiver.ClientWorker indexer *indexer.Indexer archiverMetadata carchiver.ArchivalMetadata archiverProvider provider.ArchiverProvider historyConfig *HistoryConfig esConfig *config.ElasticSearchConfig esClient elasticsearch.GenericClient workerConfig *WorkerConfig mockAdminClient map[string]adminClient.Client domainReplicationTaskExecutor domain.ReplicationTaskExecutor authorizationConfig config.Authorization pinotConfig *config.PinotVisibilityConfig pinotClient pinot.GenericClient asyncWFQueues map[string]config.AsyncWorkflowQueueProvider timeSource clock.TimeSource // dynamicconfig overrides per service frontendDynCfgOverrides map[dynamicconfig.Key]interface{} historyDynCfgOverrides map[dynamicconfig.Key]interface{} matchingDynCfgOverrides map[dynamicconfig.Key]interface{} workerDynCfgOverrides map[dynamicconfig.Key]interface{} } // HistoryConfig contains configs for history service HistoryConfig struct { NumHistoryShards int NumHistoryHosts int HistoryCountLimitError int HistoryCountLimitWarn int } // CadenceParams contains everything needed to bootstrap Cadence CadenceParams struct { ClusterMetadata cluster.Metadata PersistenceConfig config.Persistence MessagingClient messaging.Client DomainManager persistence.DomainManager HistoryV2Mgr persistence.HistoryManager ExecutionMgrFactory persistence.ExecutionManagerFactory DomainReplicationQueue domain.ReplicationQueue Logger log.Logger ClusterNo int ArchiverMetadata carchiver.ArchivalMetadata ArchiverProvider provider.ArchiverProvider EnableReadHistoryFromArchival bool HistoryConfig *HistoryConfig ESConfig *config.ElasticSearchConfig ESClient elasticsearch.GenericClient WorkerConfig *WorkerConfig MockAdminClient map[string]adminClient.Client DomainReplicationTaskExecutor domain.ReplicationTaskExecutor AuthorizationConfig config.Authorization PinotConfig *config.PinotVisibilityConfig PinotClient pinot.GenericClient AsyncWFQueues map[string]config.AsyncWorkflowQueueProvider TimeSource clock.TimeSource FrontendDynCfgOverrides map[dynamicconfig.Key]interface{} HistoryDynCfgOverrides map[dynamicconfig.Key]interface{} MatchingDynCfgOverrides map[dynamicconfig.Key]interface{} WorkerDynCfgOverrides map[dynamicconfig.Key]interface{} } ) // NewCadence returns an instance that hosts full cadence in one process func NewCadence(params *CadenceParams) Cadence { return &cadenceImpl{ logger: params.Logger, clusterMetadata: params.ClusterMetadata, persistenceConfig: params.PersistenceConfig, messagingClient: params.MessagingClient, domainManager: params.DomainManager, historyV2Mgr: params.HistoryV2Mgr, executionMgrFactory: params.ExecutionMgrFactory, domainReplicationQueue: params.DomainReplicationQueue, shutdownCh: make(chan struct{}), clusterNo: params.ClusterNo, esConfig: params.ESConfig, esClient: params.ESClient, archiverMetadata: params.ArchiverMetadata, archiverProvider: params.ArchiverProvider, historyConfig: params.HistoryConfig, workerConfig: params.WorkerConfig, mockAdminClient: params.MockAdminClient, domainReplicationTaskExecutor: params.DomainReplicationTaskExecutor, authorizationConfig: params.AuthorizationConfig, pinotConfig: params.PinotConfig, pinotClient: params.PinotClient, asyncWFQueues: params.AsyncWFQueues, timeSource: params.TimeSource, frontendDynCfgOverrides: params.FrontendDynCfgOverrides, historyDynCfgOverrides: params.HistoryDynCfgOverrides, matchingDynCfgOverrides: params.MatchingDynCfgOverrides, workerDynCfgOverrides: params.WorkerDynCfgOverrides, } } func (c *cadenceImpl) enableWorker() bool { return c.workerConfig.EnableArchiver || c.workerConfig.EnableIndexer || c.workerConfig.EnableReplicator || c.workerConfig.EnableAsyncWFConsumer } func (c *cadenceImpl) Start() error { hosts := make(map[string][]membership.HostInfo) hosts[service.Frontend] = []membership.HostInfo{c.FrontendHost()} hosts[service.Matching] = []membership.HostInfo{c.MatchingServiceHost()} hosts[service.History] = c.HistoryHosts() if c.enableWorker() { hosts[service.Worker] = []membership.HostInfo{c.WorkerServiceHost()} } // create cadence-system domain, this must be created before starting // the services - so directly use the metadataManager to create this if err := c.createSystemDomain(); err != nil { return err } var startWG sync.WaitGroup startWG.Add(2) go c.startHistory(hosts, &startWG) go c.startMatching(hosts, &startWG) startWG.Wait() startWG.Add(1) go c.startFrontend(hosts, &startWG) startWG.Wait() if c.enableWorker() { startWG.Add(1) go c.startWorker(hosts, &startWG) startWG.Wait() } return nil } func (c *cadenceImpl) Stop() { if c.enableWorker() { c.shutdownWG.Add(4) } else { c.shutdownWG.Add(3) } c.frontendService.Stop() for _, historyService := range c.historyServices { historyService.Stop() } c.matchingService.Stop() if c.workerConfig.EnableReplicator { c.replicator.Stop() } if c.workerConfig.EnableArchiver { c.clientWorker.Stop() } close(c.shutdownCh) c.shutdownWG.Wait() } func newHost(tchan uint16) membership.HostInfo { address := "127.0.0.1" return membership.NewDetailedHostInfo( fmt.Sprintf("%s:%d", address, tchan), fmt.Sprintf("%s_%d", address, tchan), membership.PortMap{ membership.PortTchannel: tchan, membership.PortGRPC: tchan + 10, }, ) } func (c *cadenceImpl) FrontendHost() membership.HostInfo { var tchan uint16 switch c.clusterNo { case 0: tchan = 7104 case 1: tchan = 8104 case 2: tchan = 9104 case 3: tchan = 10104 default: tchan = 7104 } return newHost(tchan) } func (c *cadenceImpl) FrontendPProfPort() int { switch c.clusterNo { case 0: return 7105 case 1: return 8105 case 2: return 9105 case 3: return 10105 default: return 7105 } } func (c *cadenceImpl) HistoryHosts() []membership.HostInfo { var hosts []membership.HostInfo var startPort int switch c.clusterNo { case 0: startPort = 7201 case 1: startPort = 8201 case 2: startPort = 9201 case 3: startPort = 10201 default: startPort = 7201 } for i := 0; i < c.historyConfig.NumHistoryHosts; i++ { port := startPort + i hosts = append(hosts, newHost(uint16(port))) } c.logger.Info("History hosts", tag.Value(hosts)) return hosts } func (c *cadenceImpl) HistoryPProfPort() []int { var ports []int var startPort int switch c.clusterNo { case 0: startPort = 7301 case 1: startPort = 8301 case 2: startPort = 9301 case 3: startPort = 10301 default: startPort = 7301 } for i := 0; i < c.historyConfig.NumHistoryHosts; i++ { port := startPort + i ports = append(ports, port) } c.logger.Info("History pprof ports", tag.Value(ports)) return ports } func (c *cadenceImpl) MatchingServiceHost() membership.HostInfo { var tchan uint16 switch c.clusterNo { case 0: tchan = 7106 case 1: tchan = 8106 case 2: tchan = 9106 case 3: tchan = 10106 default: tchan = 7106 } return newHost(tchan) } func (c *cadenceImpl) MatchingPProfPort() int { switch c.clusterNo { case 0: return 7107 case 1: return 8107 case 2: return 9107 case 3: return 10107 default: return 7107 } } func (c *cadenceImpl) WorkerServiceHost() membership.HostInfo { var tchan uint16 switch c.clusterNo { case 0: tchan = 7108 case 1: tchan = 8108 case 2: tchan = 9108 case 3: tchan = 10108 default: tchan = 7108 } return newHost(tchan) } func (c *cadenceImpl) WorkerPProfPort() int { switch c.clusterNo { case 0: return 7109 case 1: return 8109 case 2: return 9109 case 3: return 10109 default: return 7109 } } func (c *cadenceImpl) GetAdminClient() adminClient.Client { return c.adminClient } func (c *cadenceImpl) GetFrontendClient() frontendClient.Client { return c.frontendClient } func (c *cadenceImpl) GetHistoryClient() historyClient.Client { return c.historyClient } func (c *cadenceImpl) startFrontend(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { params := new(resource.Params) params.ClusterRedirectionPolicy = &config.ClusterRedirectionPolicy{} params.Name = service.Frontend params.Logger = c.logger params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, c.FrontendPProfPort()) params.RPCFactory = c.newRPCFactory(service.Frontend, c.FrontendHost()) params.MetricScope = tally.NewTestScope(service.Frontend, make(map[string]string)) params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.frontendDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider params.ESConfig = c.esConfig params.ESClient = c.esClient params.PinotConfig = c.pinotConfig params.PinotClient = c.pinotClient var err error authorizer, err := authorization.NewAuthorizer(c.authorizationConfig, params.Logger, nil) if err != nil { c.logger.Fatal("Unable to create authorizer", tag.Error(err)) } params.Authorizer = authorizer params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) if err != nil { c.logger.Fatal("Failed to copy persistence config for frontend", tag.Error(err)) } if c.pinotConfig != nil { pinotDataStoreName := "pinot-visibility" params.PersistenceConfig.AdvancedVisibilityStore = pinotDataStoreName params.DynamicConfig.UpdateValue(dynamicconfig.EnableReadVisibilityFromES, false) params.PersistenceConfig.DataStores[pinotDataStoreName] = config.DataStore{ Pinot: c.pinotConfig, } } else if c.esConfig != nil { esDataStoreName := "es-visibility" params.PersistenceConfig.AdvancedVisibilityStore = esDataStoreName params.PersistenceConfig.DataStores[esDataStoreName] = config.DataStore{ ElasticSearch: c.esConfig, } } if c.asyncWFQueues != nil { params.AsyncWorkflowQueueProvider, err = queue.NewAsyncQueueProvider(c.asyncWFQueues) if err != nil { c.logger.Fatal("error creating async queue provider", tag.Error(err)) } } frontendService, err := frontend.NewService(params) if err != nil { params.Logger.Fatal("unable to start frontend service", tag.Error(err)) } if c.mockAdminClient != nil { clientBean := frontendService.GetClientBean() if clientBean != nil { for serviceName, client := range c.mockAdminClient { clientBean.SetRemoteAdminClient(serviceName, client) } } } c.frontendService = frontendService c.frontendClient = NewFrontendClient(frontendService.GetDispatcher()) c.adminClient = NewAdminClient(frontendService.GetDispatcher()) go frontendService.Start() startWG.Done() <-c.shutdownCh c.shutdownWG.Done() } func (c *cadenceImpl) startHistory( hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup, ) { pprofPorts := c.HistoryPProfPort() for i, hostport := range c.HistoryHosts() { params := new(resource.Params) params.Name = service.History params.Logger = c.logger params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, pprofPorts[i]) params.RPCFactory = c.newRPCFactory(service.History, hostport) params.MetricScope = tally.NewTestScope(service.History, make(map[string]string)) params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MessagingClient = c.messagingClient params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) integrationClient := newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.historyDynCfgOverrides) c.overrideHistoryDynamicConfig(integrationClient) params.DynamicConfig = integrationClient params.PublicClient = newPublicClient(params.RPCFactory.GetDispatcher()) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider params.ESConfig = c.esConfig params.ESClient = c.esClient var err error params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) if err != nil { c.logger.Fatal("Failed to copy persistence config for history", tag.Error(err)) } if c.pinotConfig != nil { pinotDataStoreName := "pinot-visibility" params.PersistenceConfig.AdvancedVisibilityStore = pinotDataStoreName params.PersistenceConfig.DataStores[pinotDataStoreName] = config.DataStore{ Pinot: c.pinotConfig, ElasticSearch: c.esConfig, } } else if c.esConfig != nil { esDataStoreName := "es-visibility" params.PersistenceConfig.AdvancedVisibilityStore = esDataStoreName params.PersistenceConfig.DataStores[esDataStoreName] = config.DataStore{ ElasticSearch: c.esConfig, } } historyService, err := history.NewService(params) if err != nil { params.Logger.Fatal("unable to start history service", tag.Error(err)) } if c.mockAdminClient != nil { clientBean := historyService.GetClientBean() if clientBean != nil { for serviceName, client := range c.mockAdminClient { clientBean.SetRemoteAdminClient(serviceName, client) } } } // TODO: this is not correct when there are multiple history hosts as later client will overwrite previous ones. // However current interface for getting history client doesn't specify which client it needs and the tests that use this API // depends on the fact that there's only one history host. // Need to change those tests and modify the interface for getting history client. c.historyClient = NewHistoryClient(historyService.GetDispatcher()) c.historyServices = append(c.historyServices, historyService) go historyService.Start() } startWG.Done() <-c.shutdownCh c.shutdownWG.Done() } func (c *cadenceImpl) startMatching(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { params := new(resource.Params) params.Name = service.Matching params.Logger = c.logger params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, c.MatchingPProfPort()) params.RPCFactory = c.newRPCFactory(service.Matching, c.MatchingServiceHost()) params.MetricScope = tally.NewTestScope(service.Matching, make(map[string]string)) params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.matchingDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider var err error params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) if err != nil { c.logger.Fatal("Failed to copy persistence config for matching", tag.Error(err)) } matchingService, err := matching.NewService(params) if err != nil { params.Logger.Fatal("unable to start matching service", tag.Error(err)) } if c.mockAdminClient != nil { clientBean := matchingService.GetClientBean() if clientBean != nil { for serviceName, client := range c.mockAdminClient { clientBean.SetRemoteAdminClient(serviceName, client) } } } c.matchingService = matchingService go c.matchingService.Start() startWG.Done() <-c.shutdownCh c.shutdownWG.Done() } func (c *cadenceImpl) startWorker(hosts map[string][]membership.HostInfo, startWG *sync.WaitGroup) { defer c.shutdownWG.Done() params := new(resource.Params) params.Name = service.Worker params.Logger = c.logger params.ThrottledLogger = c.logger params.TimeSource = c.timeSource params.PProfInitializer = newPProfInitializerImpl(c.logger, c.WorkerPProfPort()) params.RPCFactory = c.newRPCFactory(service.Worker, c.WorkerServiceHost()) params.MetricScope = tally.NewTestScope(service.Worker, make(map[string]string)) params.MembershipResolver = newMembershipResolver(params.Name, hosts) params.ClusterMetadata = c.clusterMetadata params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, c.logger)) params.DynamicConfig = newIntegrationConfigClient(dynamicconfig.NewNopClient(), c.workerDynCfgOverrides) params.ArchivalMetadata = c.archiverMetadata params.ArchiverProvider = c.archiverProvider var err error params.PersistenceConfig, err = copyPersistenceConfig(c.persistenceConfig) if err != nil { c.logger.Fatal("Failed to copy persistence config for worker", tag.Error(err)) } params.PublicClient = newPublicClient(params.RPCFactory.GetDispatcher()) service := NewService(params) service.Start() var replicatorDomainCache cache.DomainCache if c.workerConfig.EnableReplicator { metadataManager := metered.NewDomainManager(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig) replicatorDomainCache = cache.NewDomainCache(metadataManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger()) replicatorDomainCache.Start() defer replicatorDomainCache.Stop() c.startWorkerReplicator(service) } var clientWorkerDomainCache cache.DomainCache if c.workerConfig.EnableArchiver { metadataProxyManager := metered.NewDomainManager(c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig) clientWorkerDomainCache = cache.NewDomainCache(metadataProxyManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger()) clientWorkerDomainCache.Start() defer clientWorkerDomainCache.Stop() c.startWorkerClientWorker(params, service, clientWorkerDomainCache) } if c.workerConfig.EnableIndexer { c.startWorkerIndexer(params, service) } var asyncWFDomainCache cache.DomainCache if c.workerConfig.EnableAsyncWFConsumer { queueProvider, err := queue.NewAsyncQueueProvider(c.asyncWFQueues) if err != nil { c.logger.Fatal("error creating async queue provider", tag.Error(err)) } metadataProxyManager := metered.NewDomainManager( c.domainManager, service.GetMetricsClient(), c.logger, &c.persistenceConfig) asyncWFDomainCache = cache.NewDomainCache( metadataProxyManager, c.clusterMetadata, service.GetMetricsClient(), service.GetLogger(), cache.WithTimeSource(params.TimeSource)) asyncWFDomainCache.Start() defer asyncWFDomainCache.Stop() cm := asyncworkflow.NewConsumerManager( service.GetLogger(), service.GetMetricsClient(), asyncWFDomainCache, queueProvider, c.frontendClient, asyncworkflow.WithTimeSource(params.TimeSource), asyncworkflow.WithRefreshInterval(time.Second), ) cm.Start() defer cm.Stop() } startWG.Done() <-c.shutdownCh if c.workerConfig.EnableReplicator { replicatorDomainCache.Stop() } if c.workerConfig.EnableArchiver { clientWorkerDomainCache.Stop() } } func (c *cadenceImpl) startWorkerReplicator(svc Service) { c.replicator = replicator.NewReplicator( c.clusterMetadata, svc.GetClientBean(), c.logger, svc.GetMetricsClient(), svc.GetHostInfo(), svc.GetMembershipResolver(), c.domainReplicationQueue, c.domainReplicationTaskExecutor, time.Millisecond, ) if err := c.replicator.Start(); err != nil { c.replicator.Stop() c.logger.Fatal("Fail to start replicator when start worker", tag.Error(err)) } } func (c *cadenceImpl) startWorkerClientWorker(params *resource.Params, svc Service, domainCache cache.DomainCache) { workerConfig := worker.NewConfig(params) workerConfig.ArchiverConfig.ArchiverConcurrency = dynamicconfig.GetIntPropertyFn(10) historyArchiverBootstrapContainer := &carchiver.HistoryBootstrapContainer{ HistoryV2Manager: c.historyV2Mgr, Logger: c.logger, MetricsClient: svc.GetMetricsClient(), ClusterMetadata: c.clusterMetadata, DomainCache: domainCache, } err := c.archiverProvider.RegisterBootstrapContainer(service.Worker, historyArchiverBootstrapContainer, &carchiver.VisibilityBootstrapContainer{}) if err != nil { c.logger.Fatal("Failed to register archiver bootstrap container for worker service", tag.Error(err)) } bc := &archiver.BootstrapContainer{ PublicClient: params.PublicClient, MetricsClient: svc.GetMetricsClient(), Logger: c.logger, HistoryV2Manager: c.historyV2Mgr, DomainCache: domainCache, Config: workerConfig.ArchiverConfig, ArchiverProvider: c.archiverProvider, } c.clientWorker = archiver.NewClientWorker(bc) if err := c.clientWorker.Start(); err != nil { c.clientWorker.Stop() c.logger.Fatal("Fail to start archiver when start worker", tag.Error(err)) } } func (c *cadenceImpl) startWorkerIndexer(params *resource.Params, service Service) { params.DynamicConfig.UpdateValue(dynamicconfig.AdvancedVisibilityWritingMode, common.AdvancedVisibilityWritingModeDual) workerConfig := worker.NewConfig(params) c.indexer = indexer.NewIndexer( workerConfig.IndexerCfg, c.messagingClient, c.esClient, c.esConfig.Indices[common.VisibilityAppName], c.logger, service.GetMetricsClient()) if err := c.indexer.Start(); err != nil { c.indexer.Stop() c.logger.Fatal("Fail to start indexer when start worker", tag.Error(err)) } } func (c *cadenceImpl) createSystemDomain() error { ctx, cancel := context.WithTimeout(context.Background(), defaultTestPersistenceTimeout) defer cancel() _, err := c.domainManager.CreateDomain(ctx, &persistence.CreateDomainRequest{ Info: &persistence.DomainInfo{ ID: uuid.New(), Name: "cadence-system", Status: persistence.DomainStatusRegistered, Description: "Cadence system domain", }, Config: &persistence.DomainConfig{ Retention: 1, HistoryArchivalStatus: types.ArchivalStatusDisabled, VisibilityArchivalStatus: types.ArchivalStatusDisabled, }, ReplicationConfig: &persistence.DomainReplicationConfig{}, FailoverVersion: common.EmptyVersion, }) if err != nil { if _, ok := err.(*types.DomainAlreadyExistsError); ok { return nil } return fmt.Errorf("failed to create cadence-system domain: %v", err) } return nil } func (c *cadenceImpl) GetExecutionManagerFactory() persistence.ExecutionManagerFactory { return c.executionMgrFactory } func (c *cadenceImpl) overrideHistoryDynamicConfig(client *dynamicClient) { client.OverrideValue(dynamicconfig.HistoryMgrNumConns, c.historyConfig.NumHistoryShards) client.OverrideValue(dynamicconfig.ExecutionMgrNumConns, c.historyConfig.NumHistoryShards) client.OverrideValue(dynamicconfig.ReplicationTaskProcessorStartWait, time.Nanosecond) if c.workerConfig.EnableIndexer { client.OverrideValue(dynamicconfig.AdvancedVisibilityWritingMode, common.AdvancedVisibilityWritingModeDual) } if c.historyConfig.HistoryCountLimitWarn != 0 { client.OverrideValue(dynamicconfig.HistoryCountLimitWarn, c.historyConfig.HistoryCountLimitWarn) } if c.historyConfig.HistoryCountLimitError != 0 { client.OverrideValue(dynamicconfig.HistoryCountLimitError, c.historyConfig.HistoryCountLimitError) } } // copyPersistenceConfig makes a deepcopy of persistence config. // This is just a temp fix for the race condition of persistence config. // The race condition happens because all the services are using the same datastore map in the config. // Also all services will retry to modify the maxQPS field in the datastore during start up and use the modified maxQPS value to create a persistence factory. func copyPersistenceConfig(pConfig config.Persistence) (config.Persistence, error) { copiedDataStores := make(map[string]config.DataStore) for name, value := range pConfig.DataStores { copiedDataStore := config.DataStore{} encodedDataStore, err := json.Marshal(value) if err != nil { return pConfig, err } if err = json.Unmarshal(encodedDataStore, &copiedDataStore); err != nil { return pConfig, err } copiedDataStores[name] = copiedDataStore } pConfig.DataStores = copiedDataStores return pConfig, nil } func newMembershipResolver(serviceName string, hosts map[string][]membership.HostInfo) membership.Resolver { return NewSimpleResolver(serviceName, hosts) } func newPProfInitializerImpl(logger log.Logger, port int) common.PProfInitializer { return &config.PProfInitializerImpl{ PProf: &config.PProf{ Port: port, }, Logger: logger, } } func newPublicClient(dispatcher *yarpc.Dispatcher) cwsc.Interface { config := dispatcher.ClientConfig(rpc.OutboundPublicClient) return compatibility.NewThrift2ProtoAdapter( apiv1.NewDomainAPIYARPCClient(config), apiv1.NewWorkflowAPIYARPCClient(config), apiv1.NewWorkerAPIYARPCClient(config), apiv1.NewVisibilityAPIYARPCClient(config), ) } func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo) common.RPCFactory { tchannelAddress, err := host.GetNamedAddress(membership.PortTchannel) if err != nil { c.logger.Fatal("failed to get PortTchannel port from host", tag.Value(host), tag.Error(err)) } grpcAddress, err := host.GetNamedAddress(membership.PortGRPC) if err != nil { c.logger.Fatal("failed to get PortGRPC port from host", tag.Value(host), tag.Error(err)) } frontendGrpcAddress, err := c.FrontendHost().GetNamedAddress(membership.PortGRPC) if err != nil { c.logger.Fatal("failed to get frontend PortGRPC", tag.Value(c.FrontendHost()), tag.Error(err)) } return rpc.NewFactory(c.logger, rpc.Params{ ServiceName: serviceName, TChannelAddress: tchannelAddress, GRPCAddress: grpcAddress, InboundMiddleware: yarpc.InboundMiddleware{ Unary: &versionMiddleware{}, }, // For integration tests to generate client out of the same outbound. OutboundsBuilder: rpc.CombineOutbounds( &singleGRPCOutbound{testOutboundName(serviceName), serviceName, grpcAddress}, &singleGRPCOutbound{rpc.OutboundPublicClient, service.Frontend, frontendGrpcAddress}, rpc.NewCrossDCOutbounds(c.clusterMetadata.GetAllClusterInfo(), rpc.NewDNSPeerChooserFactory(0, c.logger)), rpc.NewDirectOutbound(service.History, true, nil), rpc.NewDirectOutbound(service.Matching, true, nil), ), }) } // testOutbound prefixes outbound with "test-" to not clash with other real Cadence outbounds. func testOutboundName(name string) string { return "test-" + name } type singleGRPCOutbound struct { outboundName string serviceName string address string } func (b singleGRPCOutbound) Build(grpc *grpc.Transport, _ *tchannel.Transport) (yarpc.Outbounds, error) { return yarpc.Outbounds{ b.outboundName: { ServiceName: b.serviceName, Unary: grpc.NewSingleOutbound(b.address), }, }, nil } type versionMiddleware struct { } func (vm *versionMiddleware) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error { req.Headers = req.Headers.With(common.LibraryVersionHeaderName, "1.0.0"). With(common.FeatureVersionHeaderName, cc.SupportedGoSDKVersion). With(common.ClientImplHeaderName, cc.GoSDK) return h.Handle(ctx, req, resw) }