service/history/shard/controller.go (419 lines of code) (raw):

// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination controller_mock.go -self_package github.com/uber/cadence/service/history/shard package shard import ( "fmt" "sync" "sync/atomic" "time" workflow "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/resource" ) const ( shardControllerMembershipUpdateListenerName = "ShardController" ) var ( errShardIDOutOfBoundary = &workflow.BadRequestError{Message: "shard ID is out of boundary"} ) type ( // EngineFactory is used to create an instance of sharded history engine EngineFactory interface { CreateEngine(Context) engine.Engine } // Controller controls history service shards Controller interface { common.Daemon // PrepareToStop starts the graceful shutdown process for controller PrepareToStop() GetEngine(workflowID string) (engine.Engine, error) GetEngineForShard(shardID int) (engine.Engine, error) RemoveEngineForShard(shardID int) // Following methods describes the current status of the controller // TODO: consider converting to a unified describe method Status() int32 NumShards() int ShardIDs() []int32 } controller struct { resource.Resource membershipUpdateCh chan *membership.ChangedEvent engineFactory EngineFactory status int32 shuttingDown int32 shutdownWG sync.WaitGroup shutdownCh chan struct{} logger log.Logger throttledLogger log.Logger config *config.Config metricsScope metrics.Scope sync.RWMutex historyShards map[int]*historyShardsItem } historyShardsItemStatus int historyShardsItem struct { resource.Resource shardID int config *config.Config logger log.Logger throttledLogger log.Logger engineFactory EngineFactory sync.RWMutex status historyShardsItemStatus engine engine.Engine } ) const ( historyShardsItemStatusInitialized = iota historyShardsItemStatusStarted historyShardsItemStatusStopped ) // NewShardController creates a new shard controller func NewShardController( resource resource.Resource, factory EngineFactory, config *config.Config, ) Controller { hostAddress := resource.GetHostInfo().GetAddress() return &controller{ Resource: resource, status: common.DaemonStatusInitialized, membershipUpdateCh: make(chan *membership.ChangedEvent, 10), engineFactory: factory, historyShards: make(map[int]*historyShardsItem), shutdownCh: make(chan struct{}), logger: resource.GetLogger().WithTags(tag.ComponentShardController, tag.Address(hostAddress)), throttledLogger: resource.GetThrottledLogger().WithTags(tag.ComponentShardController, tag.Address(hostAddress)), config: config, metricsScope: resource.GetMetricsClient().Scope(metrics.HistoryShardControllerScope), } } func newHistoryShardsItem( resource resource.Resource, shardID int, factory EngineFactory, config *config.Config, ) (*historyShardsItem, error) { hostAddress := resource.GetHostInfo().GetAddress() return &historyShardsItem{ Resource: resource, shardID: shardID, status: historyShardsItemStatusInitialized, engineFactory: factory, config: config, logger: resource.GetLogger().WithTags(tag.ShardID(shardID), tag.Address(hostAddress)), throttledLogger: resource.GetThrottledLogger().WithTags(tag.ShardID(shardID), tag.Address(hostAddress)), }, nil } func (c *controller) Start() { if !atomic.CompareAndSwapInt32(&c.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } c.acquireShards() c.shutdownWG.Add(1) go c.shardManagementPump() err := c.GetMembershipResolver().Subscribe(service.History, shardControllerMembershipUpdateListenerName, c.membershipUpdateCh) if err != nil { c.logger.Error("subscribing to membership resolver", tag.Error(err)) } c.logger.Info("Shard controller state changed", tag.LifeCycleStarted) } func (c *controller) Stop() { if !atomic.CompareAndSwapInt32(&c.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } c.PrepareToStop() if err := c.GetMembershipResolver().Unsubscribe(service.History, shardControllerMembershipUpdateListenerName); err != nil { c.logger.Error("unsubscribing from membership resolver", tag.Error(err), tag.OperationFailed) } close(c.shutdownCh) if success := common.AwaitWaitGroup(&c.shutdownWG, time.Minute); !success { c.logger.Warn("", tag.LifeCycleStopTimedout) } c.logger.Info("Shard controller state changed", tag.LifeCycleStopped) } func (c *controller) PrepareToStop() { atomic.StoreInt32(&c.shuttingDown, 1) } func (c *controller) GetEngine(workflowID string) (engine.Engine, error) { shardID := c.config.GetShardID(workflowID) return c.GetEngineForShard(shardID) } func (c *controller) GetEngineForShard(shardID int) (engine.Engine, error) { sw := c.metricsScope.StartTimer(metrics.GetEngineForShardLatency) defer sw.Stop() item, err := c.getOrCreateHistoryShardItem(shardID) if err != nil { return nil, err } return item.getOrCreateEngine(c.shardClosedCallback) } func (c *controller) RemoveEngineForShard(shardID int) { c.removeEngineForShard(shardID, nil) } func (c *controller) Status() int32 { return atomic.LoadInt32(&c.status) } func (c *controller) NumShards() int { nShards := 0 c.RLock() nShards = len(c.historyShards) c.RUnlock() return nShards } func (c *controller) ShardIDs() []int32 { c.RLock() ids := []int32{} for id := range c.historyShards { id32 := int32(id) ids = append(ids, id32) } c.RUnlock() return ids } func (c *controller) removeEngineForShard(shardID int, shardItem *historyShardsItem) { sw := c.metricsScope.StartTimer(metrics.RemoveEngineForShardLatency) defer sw.Stop() currentShardItem, _ := c.removeHistoryShardItem(shardID, shardItem) if shardItem != nil { // if shardItem is not nil, then currentShardItem either equals to shardItem or is nil // in both cases, we need to stop the engine in shardItem shardItem.stopEngine() return } // if shardItem is nil, then stop the engine for the current shardItem, if exists if currentShardItem != nil { currentShardItem.stopEngine() } } func (c *controller) shardClosedCallback(shardID int, shardItem *historyShardsItem) { c.metricsScope.IncCounter(metrics.ShardClosedCounter) c.logger.Info("Shard controller state changed", tag.LifeCycleStopping, tag.ComponentShard, tag.ShardID(shardID)) c.removeEngineForShard(shardID, shardItem) } func (c *controller) getOrCreateHistoryShardItem(shardID int) (*historyShardsItem, error) { if shardID >= c.config.NumberOfShards || shardID < 0 { // zero based shard ID c.logger.Error(fmt.Sprintf("Received shard ID: %v is larger than supported shard number %v", shardID, c.config.NumberOfShards, ), ) return nil, errShardIDOutOfBoundary } c.RLock() if item, ok := c.historyShards[shardID]; ok { if item.isValid() { c.RUnlock() return item, nil } // if item not valid then process to create a new one } c.RUnlock() c.Lock() defer c.Unlock() if item, ok := c.historyShards[shardID]; ok { if item.isValid() { return item, nil } // if item not valid then process to create a new one } if c.isShuttingDown() || atomic.LoadInt32(&c.status) == common.DaemonStatusStopped { return nil, fmt.Errorf("controller for host '%v' shutting down", c.GetHostInfo().Identity()) } info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { return nil, err } if info.Identity() == c.GetHostInfo().Identity() { shardItem, err := newHistoryShardsItem( c.Resource, shardID, c.engineFactory, c.config, ) if err != nil { return nil, err } c.historyShards[shardID] = shardItem c.metricsScope.IncCounter(metrics.ShardItemCreatedCounter) shardItem.logger.Info("Shard item state changed", tag.LifeCycleStarted, tag.ComponentShardItem) return shardItem, nil } // for backwards compatibility, always return tchannel port return nil, CreateShardOwnershipLostError(c.GetHostInfo(), info) } func (c *controller) removeHistoryShardItem(shardID int, shardItem *historyShardsItem) (*historyShardsItem, error) { nShards := 0 c.Lock() defer c.Unlock() currentShardItem, ok := c.historyShards[shardID] if !ok { return nil, fmt.Errorf("no item found to remove for shard: %v", shardID) } if shardItem != nil && currentShardItem != shardItem { // the shardItem comparison is a defensive check to make sure we are deleting // what we intend to delete. return nil, fmt.Errorf("current shardItem doesn't match the one we intend to delete for shard: %v", shardID) } delete(c.historyShards, shardID) nShards = len(c.historyShards) c.metricsScope.IncCounter(metrics.ShardItemRemovedCounter) currentShardItem.logger.Info("Shard item state changed", tag.LifeCycleStopped, tag.ComponentShardItem, tag.Number(int64(nShards))) return currentShardItem, nil } // shardManagementPump is the main event loop for // controller. It is responsible for acquiring / // releasing shards in response to any event that can // change the shard ownership. These events are // // a. Ring membership change // b. Periodic ticker // c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine func (c *controller) shardManagementPump() { defer c.shutdownWG.Done() acquireTicker := time.NewTicker(c.config.AcquireShardInterval()) defer acquireTicker.Stop() for { select { case <-c.shutdownCh: c.doShutdown() return case <-acquireTicker.C: c.acquireShards() case changedEvent := <-c.membershipUpdateCh: c.metricsScope.IncCounter(metrics.MembershipChangedCounter) c.logger.Info("Ring membership changed", tag.ValueRingMembershipChangedEvent, tag.NumberProcessed(len(changedEvent.HostsAdded)), tag.NumberDeleted(len(changedEvent.HostsRemoved)), tag.Number(int64(len(changedEvent.HostsUpdated)))) c.acquireShards() } } } func (c *controller) acquireShards() { c.metricsScope.IncCounter(metrics.AcquireShardsCounter) sw := c.metricsScope.StartTimer(metrics.AcquireShardsLatency) defer sw.Stop() numShards := c.config.NumberOfShards shardActionCh := make(chan int, numShards) // Submit all tasks to the channel. for shardID := 0; shardID < numShards; shardID++ { shardActionCh <- shardID // must be non-blocking as there is no other coordination with shutdown } close(shardActionCh) concurrency := common.MaxInt(c.config.AcquireShardConcurrency(), 1) var wg sync.WaitGroup wg.Add(concurrency) // Spawn workers that would lookup and add/remove shards concurrently. for i := 0; i < concurrency; i++ { go func() { defer wg.Done() for shardID := range shardActionCh { if c.isShuttingDown() { return } info, err := c.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { c.logger.Error("Error looking up host for shardID", tag.Error(err), tag.OperationFailed, tag.ShardID(shardID)) } else { if info.Identity() == c.GetHostInfo().Identity() { _, err1 := c.GetEngineForShard(shardID) if err1 != nil { c.metricsScope.IncCounter(metrics.GetEngineForShardErrorCounter) c.logger.Error("Unable to create history shard engine", tag.Error(err1), tag.OperationFailed, tag.ShardID(shardID)) } } } } }() } // Wait until all shards are processed. wg.Wait() c.metricsScope.UpdateGauge(metrics.NumShardsGauge, float64(c.NumShards())) } func (c *controller) doShutdown() { c.logger.Info("Shard controller state changed", tag.LifeCycleStopping) c.Lock() defer c.Unlock() for _, item := range c.historyShards { item.stopEngine() } c.historyShards = nil } func (c *controller) isShuttingDown() bool { return atomic.LoadInt32(&c.shuttingDown) != 0 } func (i *historyShardsItem) getOrCreateEngine( closeCallback func(int, *historyShardsItem), ) (engine.Engine, error) { i.RLock() if i.status == historyShardsItemStatusStarted { defer i.RUnlock() return i.engine, nil } i.RUnlock() i.Lock() defer i.Unlock() switch i.status { case historyShardsItemStatusInitialized: i.logger.Info("Shard engine state changed", tag.LifeCycleStarting, tag.ComponentShardEngine) context, err := acquireShard(i, closeCallback) if err != nil { // invalidate the shardItem so that the same shardItem won't be // used to create another shardContext i.logger.Info("Shard engine state changed", tag.LifeCycleStopped, tag.ComponentShardEngine) i.status = historyShardsItemStatusStopped return nil, err } if context.PreviousShardOwnerWasDifferent() { i.GetMetricsClient().RecordTimer(metrics.ShardInfoScope, metrics.ShardItemAcquisitionLatency, context.GetCurrentTime(i.GetClusterMetadata().GetCurrentClusterName()).Sub(context.GetLastUpdatedTime())) } i.engine = i.engineFactory.CreateEngine(context) i.engine.Start() i.logger.Info("Shard engine state changed", tag.LifeCycleStarted, tag.ComponentShardEngine) i.status = historyShardsItemStatusStarted return i.engine, nil case historyShardsItemStatusStarted: return i.engine, nil case historyShardsItemStatusStopped: return nil, fmt.Errorf("shard %v for host '%v' is shut down", i.shardID, i.GetHostInfo().Identity()) default: panic(i.logInvalidStatus()) } } func (i *historyShardsItem) stopEngine() { i.Lock() defer i.Unlock() switch i.status { case historyShardsItemStatusInitialized: i.status = historyShardsItemStatusStopped case historyShardsItemStatusStarted: i.logger.Info("Shard engine state changed", tag.LifeCycleStopping, tag.ComponentShardEngine) i.engine.Stop() i.engine = nil i.logger.Info("Shard engine state changed", tag.LifeCycleStopped, tag.ComponentShardEngine) i.status = historyShardsItemStatusStopped case historyShardsItemStatusStopped: // no op default: panic(i.logInvalidStatus()) } } func (i *historyShardsItem) isValid() bool { i.RLock() defer i.RUnlock() switch i.status { case historyShardsItemStatusInitialized, historyShardsItemStatusStarted: return true case historyShardsItemStatusStopped: return false default: panic(i.logInvalidStatus()) } } func (i *historyShardsItem) logInvalidStatus() string { msg := fmt.Sprintf("Host '%v' encounter invalid status %v for shard item for shardID '%v'.", i.GetHostInfo().Identity(), i.status, i.shardID) i.logger.Error(msg) return msg } // IsShardOwnershiptLostError checks if a given error is shard ownership lost error func IsShardOwnershiptLostError(err error) bool { switch err.(type) { case *persistence.ShardOwnershipLostError: return true } return false } // CreateShardOwnershipLostError creates a new shard ownership lost error func CreateShardOwnershipLostError( currentHost membership.HostInfo, ownerHost membership.HostInfo, ) *types.ShardOwnershipLostError { address, err := ownerHost.GetNamedAddress(membership.PortTchannel) if err != nil { address = ownerHost.Identity() } return &types.ShardOwnershipLostError{ Message: fmt.Sprintf("Shard is not owned by host: %v", currentHost.Identity()), Owner: address, } }