service/history/queue/transfer_queue_processor.go (587 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package queue import ( "context" "errors" "fmt" "math" "sync" "sync/atomic" "time" "github.com/pborman/uuid" "github.com/uber/cadence/common" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/ndc" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/reconciliation/invariant" "github.com/uber/cadence/common/types" hcommon "github.com/uber/cadence/service/history/common" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" "github.com/uber/cadence/service/history/workflowcache" "github.com/uber/cadence/service/worker/archiver" ) var ( errUnexpectedQueueTask = errors.New("unexpected queue task") errProcessorShutdown = errors.New("queue processor has been shutdown") maximumTransferTaskKey = newTransferTaskKey(math.MaxInt64) ) type transferQueueProcessor struct { shard shard.Context historyEngine engine.Engine taskProcessor task.Processor config *config.Config currentClusterName string metricsClient metrics.Client logger log.Logger status int32 shutdownChan chan struct{} shutdownWG sync.WaitGroup ackLevel int64 taskAllocator TaskAllocator activeTaskExecutor task.Executor activeQueueProcessor *transferQueueProcessorBase standbyQueueProcessors map[string]*transferQueueProcessorBase } // NewTransferQueueProcessor creates a new transfer QueueProcessor func NewTransferQueueProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, executionCache *execution.Cache, workflowResetter reset.WorkflowResetter, archivalClient archiver.Client, executionCheck invariant.Invariant, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) Processor { logger := shard.GetLogger().WithTags(tag.ComponentTransferQueue) currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() config := shard.GetConfig() taskAllocator := NewTaskAllocator(shard) activeTaskExecutor := task.NewTransferActiveTaskExecutor( shard, archivalClient, executionCache, workflowResetter, logger, config, wfIDCache, ratelimitInternalPerWorkflowID, ) activeQueueProcessor := newTransferQueueActiveProcessor( shard, historyEngine, taskProcessor, taskAllocator, activeTaskExecutor, logger, ) standbyQueueProcessors := make(map[string]*transferQueueProcessorBase) for clusterName := range shard.GetClusterMetadata().GetRemoteClusterInfo() { historyResender := ndc.NewHistoryResender( shard.GetDomainCache(), shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName), func(ctx context.Context, request *types.ReplicateEventsV2Request) error { return historyEngine.ReplicateEventsV2(ctx, request) }, config.StandbyTaskReReplicationContextTimeout, executionCheck, shard.GetLogger(), ) standbyTaskExecutor := task.NewTransferStandbyTaskExecutor( shard, archivalClient, executionCache, historyResender, logger, clusterName, config, ) standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor( clusterName, shard, historyEngine, taskProcessor, taskAllocator, standbyTaskExecutor, logger, ) } return &transferQueueProcessor{ shard: shard, historyEngine: historyEngine, taskProcessor: taskProcessor, config: config, currentClusterName: currentClusterName, metricsClient: shard.GetMetricsClient(), logger: logger, status: common.DaemonStatusInitialized, shutdownChan: make(chan struct{}), ackLevel: shard.GetTransferAckLevel(), taskAllocator: taskAllocator, activeTaskExecutor: activeTaskExecutor, activeQueueProcessor: activeQueueProcessor, standbyQueueProcessors: standbyQueueProcessors, } } func (t *transferQueueProcessor) Start() { if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) { return } t.activeQueueProcessor.Start() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Start() } t.shutdownWG.Add(1) go t.completeTransferLoop() } func (t *transferQueueProcessor) Stop() { if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) { return } if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { t.activeQueueProcessor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } close(t.shutdownChan) common.AwaitWaitGroup(&t.shutdownWG, time.Minute) return } // close the shutdown channel so processor pump goroutine drains tasks and then stop the processors close(t.shutdownChan) if !common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout) { t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout) } t.activeQueueProcessor.Stop() for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } } func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) { if len(info.Tasks) == 0 { return } if clusterName == t.currentClusterName { t.activeQueueProcessor.notifyNewTask(info) return } standbyQueueProcessor, ok := t.standbyQueueProcessors[clusterName] if !ok { panic(fmt.Sprintf("Cannot find transfer processor for %s.", clusterName)) } standbyQueueProcessor.notifyNewTask(info) } func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { // Failover queue is used to scan all inflight tasks, if queue processor is not // started, there's no inflight task and we don't need to create a failover processor. // Also the HandleAction will be blocked if queue processor processing loop is not running. if atomic.LoadInt32(&t.status) != common.DaemonStatusStarted { return } minLevel := t.shard.GetTransferClusterAckLevel(t.currentClusterName) standbyClusterName := t.currentClusterName for clusterName := range t.shard.GetClusterMetadata().GetEnabledClusterInfo() { ackLevel := t.shard.GetTransferClusterAckLevel(clusterName) if ackLevel < minLevel { minLevel = ackLevel standbyClusterName = clusterName } } maxReadLevel := int64(0) actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction()) if err != nil { t.logger.Error("Transfer Failover Failed", tag.WorkflowDomainIDs(domainIDs), tag.Error(err)) if err == errProcessorShutdown { // processor/shard already shutdown, we don't need to create failover queue processor return } // other errors should never be returned for GetStateAction panic(fmt.Sprintf("unknown error for GetStateAction: %v", err)) } for _, queueState := range actionResult.GetStateActionResult.States { queueReadLevel := queueState.ReadLevel().(transferTaskKey).taskID if maxReadLevel < queueReadLevel { maxReadLevel = queueReadLevel } } // maxReadLevel is exclusive, so add 1 maxReadLevel++ t.logger.Info("Transfer Failover Triggered", tag.WorkflowDomainIDs(domainIDs), tag.MinLevel(minLevel), tag.MaxLevel(maxReadLevel)) updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor( t.shard, t.historyEngine, t.taskProcessor, t.taskAllocator, t.activeTaskExecutor, t.logger, minLevel, maxReadLevel, domainIDs, standbyClusterName, ) // NOTE: READ REF BEFORE MODIFICATION // ref: historyEngine.go registerDomainFailoverCallback function err = updateShardAckLevel(newTransferTaskKey(minLevel)) if err != nil { t.logger.Error("Error update shard ack level", tag.Error(err)) } failoverQueueProcessor.Start() } func (t *transferQueueProcessor) HandleAction( ctx context.Context, clusterName string, action *Action, ) (*ActionResult, error) { var resultNotificationCh chan actionResultNotification var added bool if clusterName == t.currentClusterName { resultNotificationCh, added = t.activeQueueProcessor.addAction(ctx, action) } else { found := false for standbyClusterName, standbyProcessor := range t.standbyQueueProcessors { if clusterName == standbyClusterName { resultNotificationCh, added = standbyProcessor.addAction(ctx, action) found = true break } } if !found { return nil, fmt.Errorf("unknown cluster name: %v", clusterName) } } if !added { if ctxErr := ctx.Err(); ctxErr != nil { return nil, ctxErr } return nil, errProcessorShutdown } select { case resultNotification := <-resultNotificationCh: return resultNotification.result, resultNotification.err case <-t.shutdownChan: return nil, errProcessorShutdown case <-ctx.Done(): return nil, ctx.Err() } } func (t *transferQueueProcessor) LockTaskProcessing() { t.taskAllocator.Lock() } func (t *transferQueueProcessor) UnlockTaskProcessing() { t.taskAllocator.Unlock() } func (t *transferQueueProcessor) drain() { // before shutdown, make sure the ack level is up to date if err := t.completeTransfer(); err != nil { t.logger.Error("Failed to complete transfer task during shutdown", tag.Error(err)) } } func (t *transferQueueProcessor) completeTransferLoop() { defer t.shutdownWG.Done() completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval()) defer completeTimer.Stop() for { select { case <-t.shutdownChan: t.drain() return case <-completeTimer.C: for attempt := 0; attempt < t.config.TransferProcessorCompleteTransferFailureRetryCount(); attempt++ { err := t.completeTransfer() if err == nil { break } t.logger.Error("Failed to complete transfer task", tag.Error(err)) if err == shard.ErrShardClosed { // shard closed, trigger shutdown and bail out if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { go t.Stop() return } t.Stop() return } select { case <-t.shutdownChan: t.drain() return case <-time.After(time.Duration(attempt*100) * time.Millisecond): // do nothing. retry loop will continue } } completeTimer.Reset(t.config.TransferProcessorCompleteTransferInterval()) } } } func (t *transferQueueProcessor) completeTransfer() error { newAckLevel := maximumTransferTaskKey actionResult, err := t.HandleAction(context.Background(), t.currentClusterName, NewGetStateAction()) if err != nil { return err } for _, queueState := range actionResult.GetStateActionResult.States { newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel()) } for standbyClusterName := range t.standbyQueueProcessors { actionResult, err := t.HandleAction(context.Background(), standbyClusterName, NewGetStateAction()) if err != nil { return err } for _, queueState := range actionResult.GetStateActionResult.States { newAckLevel = minTaskKey(newAckLevel, queueState.AckLevel()) } } for _, failoverInfo := range t.shard.GetAllTransferFailoverLevels() { failoverLevel := newTransferTaskKey(failoverInfo.MinLevel) if newAckLevel == nil { newAckLevel = failoverLevel } else { newAckLevel = minTaskKey(newAckLevel, failoverLevel) } } if newAckLevel == nil { panic("Unable to get transfer queue processor ack level") } newAckLevelTaskID := newAckLevel.(transferTaskKey).taskID t.logger.Debug(fmt.Sprintf("Start completing transfer task from: %v, to %v.", t.ackLevel, newAckLevelTaskID)) if t.ackLevel >= newAckLevelTaskID { return nil } t.metricsClient.Scope(metrics.TransferQueueProcessorScope). Tagged(metrics.ShardIDTag(t.shard.GetShardID())). IncCounter(metrics.TaskBatchCompleteCounter) for { pageSize := t.config.TransferTaskDeleteBatchSize() resp, err := t.shard.GetExecutionManager().RangeCompleteTransferTask(context.Background(), &persistence.RangeCompleteTransferTaskRequest{ ExclusiveBeginTaskID: t.ackLevel, InclusiveEndTaskID: newAckLevelTaskID, PageSize: pageSize, // pageSize may or may not be honored }) if err != nil { return err } if !persistence.HasMoreRowsToDelete(resp.TasksCompleted, pageSize) { break } } t.ackLevel = newAckLevelTaskID return t.shard.UpdateTransferAckLevel(newAckLevelTaskID) } func newTransferQueueActiveProcessor( shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, logger log.Logger, ) *transferQueueProcessorBase { config := shard.GetConfig() options := newTransferQueueProcessorOptions(config, true, false) currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName() logger = logger.WithTags(tag.ClusterName(currentClusterName)) taskFilter := func(taskInfo task.Info) (bool, error) { task, ok := taskInfo.(*persistence.TransferTaskInfo) if !ok { return false, errUnexpectedQueueTask } if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil { logger.Info("Domain is not in registered status, skip task in active transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo)) return false, nil } return taskAllocator.VerifyActiveTask(task.DomainID, task) } updateMaxReadLevel := func() task.Key { return newTransferTaskKey(shard.GetTransferMaxReadLevel()) } updateClusterAckLevel := func(ackLevel task.Key) error { taskID := ackLevel.(transferTaskKey).taskID return shard.UpdateTransferClusterAckLevel(currentClusterName, taskID) } updateProcessingQueueStates := func(states []ProcessingQueueState) error { pStates := convertToPersistenceTransferProcessingQueueStates(states) return shard.UpdateTransferProcessingQueueStates(currentClusterName, pStates) } queueShutdown := func() error { return nil } return newTransferQueueProcessorBase( shard, loadTransferProcessingQueueStates(currentClusterName, shard, options, logger), taskProcessor, options, updateMaxReadLevel, updateClusterAckLevel, updateProcessingQueueStates, queueShutdown, taskFilter, taskExecutor, logger, shard.GetMetricsClient(), ) } func newTransferQueueStandbyProcessor( clusterName string, shard shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, logger log.Logger, ) *transferQueueProcessorBase { config := shard.GetConfig() options := newTransferQueueProcessorOptions(config, false, false) logger = logger.WithTags(tag.ClusterName(clusterName)) taskFilter := func(taskInfo task.Info) (bool, error) { task, ok := taskInfo.(*persistence.TransferTaskInfo) if !ok { return false, errUnexpectedQueueTask } if notRegistered, err := isDomainNotRegistered(shard, task.DomainID); notRegistered && err == nil { logger.Info("Domain is not in registered status, skip task in standby transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo)) return false, nil } if task.TaskType == persistence.TransferTaskTypeCloseExecution || task.TaskType == persistence.TransferTaskTypeRecordWorkflowClosed { domainEntry, err := shard.GetDomainCache().GetDomainByID(task.DomainID) if err == nil { if domainEntry.HasReplicationCluster(clusterName) { // guarantee the processing of workflow execution close return true, nil } } else { if _, ok := err.(*types.EntityNotExistsError); !ok { // retry the task if failed to find the domain logger.Warn("Cannot find domain", tag.WorkflowDomainID(task.DomainID)) return false, err } logger.Warn("Cannot find domain, default to not process task.", tag.WorkflowDomainID(task.DomainID), tag.Value(task)) return false, nil } } return taskAllocator.VerifyStandbyTask(clusterName, task.DomainID, task) } updateMaxReadLevel := func() task.Key { return newTransferTaskKey(shard.GetTransferMaxReadLevel()) } updateClusterAckLevel := func(ackLevel task.Key) error { taskID := ackLevel.(transferTaskKey).taskID return shard.UpdateTransferClusterAckLevel(clusterName, taskID) } updateProcessingQueueStates := func(states []ProcessingQueueState) error { pStates := convertToPersistenceTransferProcessingQueueStates(states) return shard.UpdateTransferProcessingQueueStates(clusterName, pStates) } queueShutdown := func() error { return nil } return newTransferQueueProcessorBase( shard, loadTransferProcessingQueueStates(clusterName, shard, options, logger), taskProcessor, options, updateMaxReadLevel, updateClusterAckLevel, updateProcessingQueueStates, queueShutdown, taskFilter, taskExecutor, logger, shard.GetMetricsClient(), ) } func newTransferQueueFailoverProcessor( shardContext shard.Context, historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, logger log.Logger, minLevel, maxLevel int64, domainIDs map[string]struct{}, standbyClusterName string, ) (updateClusterAckLevelFn, *transferQueueProcessorBase) { config := shardContext.GetConfig() options := newTransferQueueProcessorOptions(config, true, true) currentClusterName := shardContext.GetService().GetClusterMetadata().GetCurrentClusterName() failoverUUID := uuid.New() logger = logger.WithTags( tag.ClusterName(currentClusterName), tag.WorkflowDomainIDs(domainIDs), tag.FailoverMsg("from: "+standbyClusterName), ) taskFilter := func(taskInfo task.Info) (bool, error) { task, ok := taskInfo.(*persistence.TransferTaskInfo) if !ok { return false, errUnexpectedQueueTask } if notRegistered, err := isDomainNotRegistered(shardContext, task.DomainID); notRegistered && err == nil { logger.Info("Domain is not in registered status, skip task in failover transfer queue.", tag.WorkflowDomainID(task.DomainID), tag.Value(taskInfo)) return false, nil } return taskAllocator.VerifyFailoverActiveTask(domainIDs, task.DomainID, task) } maxReadLevelTaskKey := newTransferTaskKey(maxLevel) updateMaxReadLevel := func() task.Key { return maxReadLevelTaskKey // this is a const } updateClusterAckLevel := func(ackLevel task.Key) error { taskID := ackLevel.(transferTaskKey).taskID return shardContext.UpdateTransferFailoverLevel( failoverUUID, shard.TransferFailoverLevel{ StartTime: shardContext.GetTimeSource().Now(), MinLevel: minLevel, CurrentLevel: taskID, MaxLevel: maxLevel, DomainIDs: domainIDs, }, ) } queueShutdown := func() error { return shardContext.DeleteTransferFailoverLevel(failoverUUID) } processingQueueStates := []ProcessingQueueState{ NewProcessingQueueState( defaultProcessingQueueLevel, newTransferTaskKey(minLevel), maxReadLevelTaskKey, NewDomainFilter(domainIDs, false), ), } return updateClusterAckLevel, newTransferQueueProcessorBase( shardContext, processingQueueStates, taskProcessor, options, updateMaxReadLevel, updateClusterAckLevel, nil, queueShutdown, taskFilter, taskExecutor, logger, shardContext.GetMetricsClient(), ) } func loadTransferProcessingQueueStates( clusterName string, shard shard.Context, options *queueProcessorOptions, logger log.Logger, ) []ProcessingQueueState { ackLevel := shard.GetTransferClusterAckLevel(clusterName) if options.EnableLoadQueueStates() { pStates := shard.GetTransferProcessingQueueStates(clusterName) if validateProcessingQueueStates(pStates, ackLevel) { return convertFromPersistenceTransferProcessingQueueStates(pStates) } logger.Error("Incompatible processing queue states and ackLevel", tag.Value(pStates), tag.ShardTransferAcks(ackLevel), ) } // LoadQueueStates is disabled or sanity check failed // fallback to use ackLevel return []ProcessingQueueState{ NewProcessingQueueState( defaultProcessingQueueLevel, newTransferTaskKey(ackLevel), maximumTransferTaskKey, NewDomainFilter(nil, true), ), } }