in cmd/server/cadence/server.go [103:314]
func (s *server) startService() common.Daemon {
svcCfg, err := s.cfg.GetServiceConfig(s.name)
if err != nil {
log.Fatal(err.Error())
}
params := resource.Params{}
params.Name = service.FullName(s.name)
zapLogger, err := s.cfg.Log.NewZapLogger()
if err != nil {
log.Fatal("failed to create the zap logger, err: ", err.Error())
}
params.Logger = loggerimpl.NewLogger(zapLogger).WithTags(tag.Service(params.Name))
params.PersistenceConfig = s.cfg.Persistence
err = nil
if s.cfg.DynamicConfig.Client == "" {
params.Logger.Warn("falling back to legacy file based dynamicClientConfig")
params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfigClient, params.Logger, s.doneC)
} else {
switch s.cfg.DynamicConfig.Client {
case dynamicconfig.ConfigStoreClient:
params.Logger.Info("initialising ConfigStore dynamic config client")
params.DynamicConfig, err = configstore.NewConfigStoreClient(
&s.cfg.DynamicConfig.ConfigStore,
&s.cfg.Persistence,
params.Logger,
persistence.DynamicConfig,
)
case dynamicconfig.FileBasedClient:
params.Logger.Info("initialising File Based dynamic config client")
params.DynamicConfig, err = dynamicconfig.NewFileBasedClient(&s.cfg.DynamicConfig.FileBased, params.Logger, s.doneC)
default:
params.Logger.Info("initialising NOP dynamic config client")
params.DynamicConfig = dynamicconfig.NewNopClient()
}
}
if err != nil {
params.Logger.Error("creating dynamic config client failed, using no-op config client instead", tag.Error(err))
params.DynamicConfig = dynamicconfig.NewNopClient()
}
clusterGroupMetadata := s.cfg.ClusterGroupMetadata
dc := dynamicconfig.NewCollection(
params.DynamicConfig,
params.Logger,
dynamicconfig.ClusterNameFilter(clusterGroupMetadata.CurrentClusterName),
)
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger, params.Name)
rpcParams, err := rpc.NewParams(params.Name, s.cfg, dc)
if err != nil {
log.Fatalf("error creating rpc factory params: %v", err)
}
rpcParams.OutboundsBuilder = rpc.CombineOutbounds(
rpcParams.OutboundsBuilder,
rpc.NewCrossDCOutbounds(clusterGroupMetadata.ClusterGroup, rpc.NewDNSPeerChooserFactory(s.cfg.PublicClient.RefreshInterval, params.Logger)),
)
rpcFactory := rpc.NewFactory(params.Logger, rpcParams)
params.RPCFactory = rpcFactory
peerProvider, err := ringpopprovider.New(
params.Name,
&s.cfg.Ringpop,
rpcFactory.GetChannel(),
membership.PortMap{
membership.PortGRPC: svcCfg.RPC.GRPCPort,
membership.PortTchannel: svcCfg.RPC.Port,
},
params.Logger,
)
if err != nil {
log.Fatalf("ringpop provider failed: %v", err)
}
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))
params.MembershipResolver, err = membership.NewResolver(
peerProvider,
params.Logger,
params.MetricsClient,
)
if err != nil {
log.Fatalf("error creating membership monitor: %v", err)
}
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
params.ClusterRedirectionPolicy = s.cfg.ClusterGroupMetadata.ClusterRedirectionPolicy
params.ClusterMetadata = cluster.NewMetadata(
clusterGroupMetadata.FailoverVersionIncrement,
clusterGroupMetadata.PrimaryClusterName,
clusterGroupMetadata.CurrentClusterName,
clusterGroupMetadata.ClusterGroup,
dc.GetBoolPropertyFilteredByDomain(dynamicconfig.UseNewInitialFailoverVersion),
params.MetricsClient,
params.Logger,
)
advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
)()
isAdvancedVisEnabled := common.IsAdvancedVisibilityWritingEnabled(advancedVisMode, params.PersistenceConfig.IsAdvancedVisibilityConfigExist())
if isAdvancedVisEnabled {
params.MessagingClient = kafka.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, params.Logger, params.MetricScope, isAdvancedVisEnabled)
} else {
params.MessagingClient = nil
}
if isAdvancedVisEnabled {
// verify config of advanced visibility store
advancedVisStoreKey := s.cfg.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.cfg.Persistence.DataStores[advancedVisStoreKey]
if !ok {
log.Fatalf("not able to find advanced visibility store in config: %v", advancedVisStoreKey)
}
params.ESConfig = advancedVisStore.ElasticSearch
if params.PersistenceConfig.AdvancedVisibilityStore == common.PinotVisibilityStoreName {
// components like ESAnalyzer is still using ElasticSearch
// The plan is to clean those after we switch to operate on Pinot
esVisibilityStore, ok := s.cfg.Persistence.DataStores[common.ESVisibilityStoreName]
if !ok {
log.Fatalf("Missing Elasticsearch config")
}
params.ESConfig = esVisibilityStore.ElasticSearch
params.PinotConfig = advancedVisStore.Pinot
pinotBroker := params.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
if err != nil || pinotRawClient == nil {
log.Fatalf("Creating Pinot visibility client failed: %v", err)
}
pinotClient := pnt.NewPinotClient(pinotRawClient, params.Logger, params.PinotConfig)
params.PinotClient = pinotClient
}
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient
// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
}
}
publicClientConfig := params.RPCFactory.GetDispatcher().ClientConfig(rpc.OutboundPublicClient)
if rpc.IsGRPCOutbound(publicClientConfig) {
params.PublicClient = compatibility.NewThrift2ProtoAdapter(
apiv1.NewDomainAPIYARPCClient(publicClientConfig),
apiv1.NewWorkflowAPIYARPCClient(publicClientConfig),
apiv1.NewWorkerAPIYARPCClient(publicClientConfig),
apiv1.NewVisibilityAPIYARPCClient(publicClientConfig),
)
} else {
params.PublicClient = workflowserviceclient.New(publicClientConfig)
}
params.ArchivalMetadata = archiver.NewArchivalMetadata(
dc,
s.cfg.Archival.History.Status,
s.cfg.Archival.History.EnableRead,
s.cfg.Archival.Visibility.Status,
s.cfg.Archival.Visibility.EnableRead,
&s.cfg.DomainDefaults.Archival,
)
params.ArchiverProvider = provider.NewArchiverProvider(s.cfg.Archival.History.Provider, s.cfg.Archival.Visibility.Provider)
params.PersistenceConfig.TransactionSizeLimit = dc.GetIntProperty(dynamicconfig.TransactionSizeLimit)
params.PersistenceConfig.ErrorInjectionRate = dc.GetFloat64Property(dynamicconfig.PersistenceErrorInjectionRate)
params.AuthorizationConfig = s.cfg.Authorization
params.BlobstoreClient, err = filestore.NewFilestoreClient(s.cfg.Blobstore.Filestore)
if err != nil {
log.Printf("failed to create file blobstore client, will continue startup without it: %v", err)
params.BlobstoreClient = nil
}
params.AsyncWorkflowQueueProvider, err = queue.NewAsyncQueueProvider(s.cfg.AsyncWorkflowQueues)
if err != nil {
log.Fatalf("error creating async queue provider: %v", err)
}
params.Logger.Info("Starting service " + s.name)
var daemon common.Daemon
switch params.Name {
case service.Frontend:
daemon, err = frontend.NewService(¶ms)
case service.History:
daemon, err = history.NewService(¶ms)
case service.Matching:
daemon, err = matching.NewService(¶ms)
case service.Worker:
daemon, err = worker.NewService(¶ms)
}
if err != nil {
params.Logger.Fatal("Fail to start "+s.name+" service ", tag.Error(err))
}
go execute(daemon, s.doneC)
return daemon
}