service/history/service.go (114 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package history import ( "sync/atomic" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/quotas" commonResource "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/handler" "github.com/uber/cadence/service/history/resource" "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/history/wrappers/grpc" "github.com/uber/cadence/service/history/wrappers/ratelimited" "github.com/uber/cadence/service/history/wrappers/thrift" ) const ( workflowIDCacheTTL = 1 * time.Second workflowIDCacheMaxCount = 10_000 ) // Service represents the cadence-history service type Service struct { resource.Resource status int32 handler handler.Handler stopC chan struct{} params *commonResource.Params config *config.Config } // NewService builds a new cadence-history service func NewService( params *commonResource.Params, ) (resource.Resource, error) { serviceConfig := config.New( dynamicconfig.NewCollection( params.DynamicConfig, params.Logger, dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()), ), params.PersistenceConfig.NumHistoryShards, params.RPCFactory.GetMaxMessageSize(), params.PersistenceConfig.DefaultStoreType(), params.PersistenceConfig.IsAdvancedVisibilityConfigExist(), params.HostName) params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns() serviceResource, err := resource.New( params, service.History, serviceConfig, ) if err != nil { return nil, err } return &Service{ Resource: serviceResource, status: common.DaemonStatusInitialized, stopC: make(chan struct{}), params: params, config: serviceConfig, }, nil } // Start starts the service func (s *Service) Start() { if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } logger := s.GetLogger() logger.Info("elastic search config", tag.ESConfig(s.params.ESConfig)) logger.Info("history starting") wfIDCache := workflowcache.New(workflowcache.Params{ TTL: workflowIDCacheTTL, ExternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(s.config.WorkflowIDExternalRPS), InternalLimiterFactory: quotas.NewSimpleDynamicRateLimiterFactory(s.config.WorkflowIDInternalRPS), WorkflowIDCacheExternalEnabled: s.config.WorkflowIDCacheExternalEnabled, WorkflowIDCacheInternalEnabled: s.config.WorkflowIDCacheInternalEnabled, MaxCount: workflowIDCacheMaxCount, DomainCache: s.Resource.GetDomainCache(), Logger: s.Resource.GetLogger(), MetricsClient: s.Resource.GetMetricsClient(), }) rawHandler := handler.NewHandler(s.Resource, s.config, wfIDCache, s.config.WorkflowIDInternalRateLimitEnabled) s.handler = ratelimited.NewHistoryHandler( rawHandler, wfIDCache, s.config.WorkflowIDExternalRateLimitEnabled, s.Resource.GetDomainCache(), s.Resource.GetLogger(), ) thriftHandler := thrift.NewThriftHandler(s.handler) thriftHandler.Register(s.GetDispatcher()) grpcHandler := grpc.NewGRPCHandler(s.handler) grpcHandler.Register(s.GetDispatcher()) // must start resource first s.Resource.Start() s.handler.Start() logger.Info("history started") <-s.stopC } // Stop stops the service func (s *Service) Stop() { if !atomic.CompareAndSwapInt32(&s.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } // initiate graceful shutdown : // 1. remove self from the membership ring // 2. wait for other members to discover we are going down // 3. stop acquiring new shards (periodically or based on other membership changes) // 4. wait for shard ownership to transfer (and inflight requests to drain) while still accepting new requests // 5. Reject all requests arriving at rpc handler to avoid taking on more work except for RespondXXXCompleted and // RecordXXStarted APIs - for these APIs, most of the work is already one and rejecting at last stage is // probably not that desirable. If the shard is closed, these requests will fail anyways. // 6. wait for grace period // 7. force stop the whole world and return const gossipPropagationDelay = 400 * time.Millisecond const gracePeriod = 2 * time.Second remainingTime := s.config.ShutdownDrainDuration() s.GetLogger().Info("ShutdownHandler: Evicting self from membership ring") s.GetMembershipResolver().EvictSelf() s.GetLogger().Info("ShutdownHandler: Waiting for others to discover I am unhealthy") remainingTime = common.SleepWithMinDuration(gossipPropagationDelay, remainingTime) remainingTime = s.handler.PrepareToStop(remainingTime) _ = common.SleepWithMinDuration(gracePeriod, remainingTime) close(s.stopC) s.handler.Stop() s.Resource.Stop() s.GetLogger().Info("history stopped") }