service/frontend/service.go (172 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package frontend import ( "sync/atomic" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/domain" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/service/frontend/admin" "github.com/uber/cadence/service/frontend/api" "github.com/uber/cadence/service/frontend/config" "github.com/uber/cadence/service/frontend/wrappers/accesscontrolled" "github.com/uber/cadence/service/frontend/wrappers/clusterredirection" "github.com/uber/cadence/service/frontend/wrappers/grpc" "github.com/uber/cadence/service/frontend/wrappers/metered" "github.com/uber/cadence/service/frontend/wrappers/ratelimited" "github.com/uber/cadence/service/frontend/wrappers/thrift" ) // Service represents the cadence-frontend service type Service struct { resource.Resource status int32 handler *api.WorkflowHandler adminHandler admin.Handler stopC chan struct{} config *config.Config params *resource.Params } // NewService builds a new cadence-frontend service func NewService( params *resource.Params, ) (resource.Resource, error) { isAdvancedVisExistInConfig := len(params.PersistenceConfig.AdvancedVisibilityStore) != 0 serviceConfig := config.NewConfig( dynamicconfig.NewCollection( params.DynamicConfig, params.Logger, dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()), ), params.PersistenceConfig.NumHistoryShards, isAdvancedVisExistInConfig, params.HostName, ) params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns() serviceResource, err := resource.New( params, service.Frontend, &service.Config{ PersistenceMaxQPS: serviceConfig.PersistenceMaxQPS, PersistenceGlobalMaxQPS: serviceConfig.PersistenceGlobalMaxQPS, ThrottledLoggerMaxRPS: serviceConfig.ThrottledLogRPS, EnableReadVisibilityFromES: serviceConfig.EnableReadVisibilityFromES, AdvancedVisibilityWritingMode: nil, // frontend service never write EnableReadVisibilityFromPinot: serviceConfig.EnableReadVisibilityFromPinot, EnableLogCustomerQueryParameter: serviceConfig.EnableLogCustomerQueryParameter, EnableDBVisibilitySampling: serviceConfig.EnableVisibilitySampling, EnableReadDBVisibilityFromClosedExecutionV2: serviceConfig.EnableReadFromClosedExecutionV2, DBVisibilityListMaxQPS: serviceConfig.VisibilityListMaxQPS, WriteDBVisibilityOpenMaxQPS: nil, // frontend service never write WriteDBVisibilityClosedMaxQPS: nil, // frontend service never write ESVisibilityListMaxQPS: serviceConfig.ESVisibilityListMaxQPS, ESIndexMaxResultWindow: serviceConfig.ESIndexMaxResultWindow, ValidSearchAttributes: serviceConfig.ValidSearchAttributes, IsErrorRetryableFunction: common.FrontendRetry, }, ) if err != nil { return nil, err } return &Service{ Resource: serviceResource, status: common.DaemonStatusInitialized, config: serviceConfig, stopC: make(chan struct{}), params: params, }, nil } // Start starts the service func (s *Service) Start() { if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } logger := s.GetLogger() logger.Info("frontend starting") // domain handler's shared between admin and workflow handler, so instantiate it centrally and share it dh := domain.NewHandler( s.config.DomainConfig, s.GetLogger(), s.GetDomainManager(), s.GetClusterMetadata(), domain.NewDomainReplicator(s.GetDomainReplicationQueue(), s.GetLogger()), s.GetArchivalMetadata(), s.GetArchiverProvider(), s.GetTimeSource(), ) // Base handler s.handler = api.NewWorkflowHandler(s, s.config, client.NewVersionChecker(), dh) userRateLimiter := quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(s.config.UserRPS.AsFloat64()), quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( service.Frontend, s.config.GlobalDomainUserRPS, s.config.MaxDomainUserRPSPerInstance, s.GetMembershipResolver(), )), ) workerRateLimiter := quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(s.config.WorkerRPS.AsFloat64()), quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( service.Frontend, s.config.GlobalDomainWorkerRPS, s.config.MaxDomainWorkerRPSPerInstance, s.GetMembershipResolver(), )), ) visibilityRateLimiter := quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(s.config.VisibilityRPS.AsFloat64()), quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( service.Frontend, s.config.GlobalDomainVisibilityRPS, s.config.MaxDomainVisibilityRPSPerInstance, s.GetMembershipResolver(), )), ) asyncRateLimiter := quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(s.config.AsyncRPS.AsFloat64()), quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( service.Frontend, s.config.GlobalDomainAsyncRPS, s.config.MaxDomainAsyncRPSPerInstance, s.GetMembershipResolver(), )), ) // Additional decorations var handler api.Handler = s.handler handler = ratelimited.NewAPIHandler(handler, s.GetDomainCache(), userRateLimiter, workerRateLimiter, visibilityRateLimiter, asyncRateLimiter) handler = metered.NewAPIHandler(handler, s.GetLogger(), s.GetMetricsClient(), s.GetDomainCache(), s.config) if s.params.ClusterRedirectionPolicy != nil { handler = clusterredirection.NewAPIHandler(handler, s, s.config, *s.params.ClusterRedirectionPolicy) } handler = accesscontrolled.NewAPIHandler(handler, s, s.params.Authorizer, s.params.AuthorizationConfig) // Register the latest (most decorated) handler thriftHandler := thrift.NewAPIHandler(handler) thriftHandler.Register(s.GetDispatcher()) grpcHandler := grpc.NewAPIHandler(handler) grpcHandler.Register(s.GetDispatcher()) s.adminHandler = admin.NewHandler(s, s.params, s.config, dh) s.adminHandler = accesscontrolled.NewAdminHandler(s.adminHandler, s, s.params.Authorizer, s.params.AuthorizationConfig) adminThriftHandler := thrift.NewAdminHandler(s.adminHandler) adminThriftHandler.Register(s.GetDispatcher()) adminGRPCHandler := grpc.NewAdminHandler(s.adminHandler) adminGRPCHandler.Register(s.GetDispatcher()) // must start resource first s.Resource.Start() s.handler.Start() s.adminHandler.Start() // base (service is not started in frontend or admin handler) in case of race condition in yarpc registration function logger.Info("frontend started") <-s.stopC } // Stop stops the service func (s *Service) Stop() { if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } // initiate graceful shutdown: // 1. Fail rpc health check, this will cause client side load balancer to stop forwarding requests to this node // 2. wait for failure detection time // 3. stop taking new requests by returning InternalServiceError // 4. Wait for a second // 5. Stop everything forcefully and return requestDrainTime := common.MinDuration(time.Second, s.config.ShutdownDrainDuration()) failureDetectionTime := common.MaxDuration(0, s.config.ShutdownDrainDuration()-requestDrainTime) s.GetLogger().Info("ShutdownHandler: Updating rpc health status to ShuttingDown") s.handler.UpdateHealthStatus(api.HealthStatusShuttingDown) s.GetLogger().Info("ShutdownHandler: Waiting for others to discover I am unhealthy") time.Sleep(failureDetectionTime) s.handler.Stop() s.adminHandler.Stop() s.GetLogger().Info("ShutdownHandler: Draining traffic") time.Sleep(requestDrainTime) close(s.stopC) s.Resource.Stop() s.params.Logger.Info("frontend stopped") }