service/history/queue/transfer_queue_processor_base.go (483 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package queue
import (
"context"
"math/rand"
"sync"
"sync/atomic"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
hcommon "github.com/uber/cadence/service/history/common"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/task"
)
const (
numTasksEstimationDecay = 0.6
)
var (
loadQueueTaskThrottleRetryDelay = 5 * time.Second
persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy()
)
type (
transferTaskKey struct {
taskID int64
}
transferQueueProcessorBase struct {
*processorBase
taskInitializer task.Initializer
notifyCh chan struct{}
processCh chan struct{}
// for managing if a processing queue collection should be processed
startJitterTimer *time.Timer
processingLock sync.Mutex
backoffTimer map[int]*time.Timer
shouldProcess map[int]bool
// for estimating the look ahead taskID during split
lastSplitTime time.Time
lastMaxReadLevel int64
estimatedTasksPerMinute int64
// for validating if the queue failed to load any tasks
validator *transferQueueValidator
processQueueCollectionsFn func()
updateAckLevelFn func() (bool, task.Key, error)
}
)
func newTransferQueueProcessorBase(
shard shard.Context,
processingQueueStates []ProcessingQueueState,
taskProcessor task.Processor,
options *queueProcessorOptions,
updateMaxReadLevel updateMaxReadLevelFn,
updateClusterAckLevel updateClusterAckLevelFn,
updateProcessingQueueStates updateProcessingQueueStatesFn,
queueShutdown queueShutdownFn,
taskFilter task.Filter,
taskExecutor task.Executor,
logger log.Logger,
metricsClient metrics.Client,
) *transferQueueProcessorBase {
processorBase := newProcessorBase(
shard,
processingQueueStates,
taskProcessor,
options,
updateMaxReadLevel,
updateClusterAckLevel,
updateProcessingQueueStates,
queueShutdown,
logger.WithTags(tag.ComponentTransferQueue),
metricsClient,
)
queueType := task.QueueTypeActiveTransfer
if options.MetricScope == metrics.TransferStandbyQueueProcessorScope {
queueType = task.QueueTypeStandbyTransfer
}
transferQueueProcessorBase := &transferQueueProcessorBase{
processorBase: processorBase,
taskInitializer: func(taskInfo task.Info) task.Task {
return task.NewTransferTask(
shard,
taskInfo,
queueType,
task.InitializeLoggerForTask(shard.GetShardID(), taskInfo, logger),
taskFilter,
taskExecutor,
taskProcessor,
processorBase.redispatcher.AddTask,
shard.GetConfig().TaskCriticalRetryCount,
)
},
notifyCh: make(chan struct{}, 1),
processCh: make(chan struct{}, 1),
backoffTimer: make(map[int]*time.Timer),
shouldProcess: make(map[int]bool),
lastSplitTime: time.Time{},
lastMaxReadLevel: 0,
}
transferQueueProcessorBase.processQueueCollectionsFn = transferQueueProcessorBase.processQueueCollections
transferQueueProcessorBase.updateAckLevelFn = transferQueueProcessorBase.updateAckLevel
if shard.GetConfig().EnableDebugMode && options.EnableValidator() {
transferQueueProcessorBase.validator = newTransferQueueValidator(
transferQueueProcessorBase,
options.ValidationInterval,
logger,
processorBase.metricsScope,
)
}
return transferQueueProcessorBase
}
func (t *transferQueueProcessorBase) Start() {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusInitialized, common.DaemonStatusStarted) {
return
}
t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarting)
defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStarted)
t.redispatcher.Start()
// trigger an initial (maybe delayed) load of tasks
if startJitter := t.options.MaxStartJitterInterval(); startJitter > 0 {
t.startJitterTimer = time.AfterFunc(
time.Duration(rand.Int63n(int64(startJitter))),
func() {
t.notifyAllQueueCollections()
},
)
} else {
t.notifyAllQueueCollections()
}
t.shutdownWG.Add(1)
go t.processorPump()
}
func (t *transferQueueProcessorBase) Stop() {
if !atomic.CompareAndSwapInt32(&t.status, common.DaemonStatusStarted, common.DaemonStatusStopped) {
return
}
t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopping)
defer t.logger.Info("Transfer queue processor state changed", tag.LifeCycleStopped)
close(t.shutdownCh)
if t.startJitterTimer != nil {
t.startJitterTimer.Stop()
}
t.processingLock.Lock()
for _, timer := range t.backoffTimer {
timer.Stop()
}
for level := range t.shouldProcess {
t.shouldProcess[level] = false
}
t.processingLock.Unlock()
if success := common.AwaitWaitGroup(&t.shutdownWG, gracefulShutdownTimeout); !success {
t.logger.Warn("transferQueueProcessorBase timed out on shut down", tag.LifeCycleStopTimedout)
}
t.redispatcher.Stop()
}
func (t *transferQueueProcessorBase) notifyNewTask(info *hcommon.NotifyTaskInfo) {
select {
case t.notifyCh <- struct{}{}:
default:
}
if info.ExecutionInfo != nil && t.validator != nil {
// executionInfo will be nil when notifyNewTask is called to trigger a scan, for example during domain failover or sync shard.
t.validator.addTasks(info)
}
}
func (t *transferQueueProcessorBase) readyForProcess(level int) {
t.processingLock.Lock()
defer t.processingLock.Unlock()
if _, ok := t.backoffTimer[level]; ok {
// current level is being throttled
return
}
t.shouldProcess[level] = true
// trigger the actual processing
select {
case t.processCh <- struct{}{}:
default:
}
}
func (t *transferQueueProcessorBase) setupBackoffTimer(level int) {
t.processingLock.Lock()
defer t.processingLock.Unlock()
if _, ok := t.backoffTimer[level]; ok {
// there's an existing backoff timer, no-op
// this case should not happen
return
}
t.metricsScope.IncCounter(metrics.ProcessingQueueThrottledCounter)
t.logger.Info("Throttled processing queue", tag.QueueLevel(level))
backoffDuration := backoff.JitDuration(
t.options.PollBackoffInterval(),
t.options.PollBackoffIntervalJitterCoefficient(),
)
t.backoffTimer[level] = time.AfterFunc(backoffDuration, func() {
select {
case <-t.shutdownCh:
return
default:
}
t.processingLock.Lock()
defer t.processingLock.Unlock()
t.shouldProcess[level] = true
delete(t.backoffTimer, level)
// trigger the actual processing
select {
case t.processCh <- struct{}{}:
default:
}
})
}
func (t *transferQueueProcessorBase) processorPump() {
defer t.shutdownWG.Done()
updateAckTimer := time.NewTimer(backoff.JitDuration(
t.options.UpdateAckInterval(),
t.options.UpdateAckIntervalJitterCoefficient(),
))
defer updateAckTimer.Stop()
splitQueueTimer := time.NewTimer(backoff.JitDuration(
t.options.SplitQueueInterval(),
t.options.SplitQueueIntervalJitterCoefficient(),
))
defer splitQueueTimer.Stop()
maxPollTimer := time.NewTimer(backoff.JitDuration(
t.options.MaxPollInterval(),
t.options.MaxPollIntervalJitterCoefficient(),
))
defer maxPollTimer.Stop()
for {
select {
case <-t.shutdownCh:
return
case <-t.notifyCh:
// notify all queue collections as they are waiting for the notification when there's
// no more task to process. For non-default queue, if we choose to do periodic polling
// in the future, then we don't need to notify them.
t.notifyAllQueueCollections()
case <-maxPollTimer.C:
t.notifyAllQueueCollections()
maxPollTimer.Reset(backoff.JitDuration(
t.options.MaxPollInterval(),
t.options.MaxPollIntervalJitterCoefficient(),
))
case <-t.processCh:
maxRedispatchQueueSize := t.options.MaxRedispatchQueueSize()
if redispathSize := t.redispatcher.Size(); redispathSize > maxRedispatchQueueSize {
t.logger.Debugf("Transfer queue has too many pending tasks in re-dispatch queue: %v > maxRedispatchQueueSize: %v, block loading tasks from persistence", redispathSize, maxRedispatchQueueSize)
t.redispatcher.Redispatch(maxRedispatchQueueSize)
if redispathSize := t.redispatcher.Size(); redispathSize > maxRedispatchQueueSize {
// if redispatcher still has a large number of tasks
// this only happens when system is under very high load
// we should backoff here instead of keeping submitting tasks to task processor
t.logger.Debugf("Transfer queue still has too many pending tasks in re-dispatch queue: %v > maxRedispatchQueueSize: %v, backing off for %v", redispathSize, maxRedispatchQueueSize, t.options.PollBackoffInterval())
time.Sleep(backoff.JitDuration(
t.options.PollBackoffInterval(),
t.options.PollBackoffIntervalJitterCoefficient(),
))
}
// re-enqueue the event to see if we need keep re-dispatching or load new tasks from persistence
select {
case t.processCh <- struct{}{}:
default:
}
} else {
t.processQueueCollectionsFn()
}
case <-updateAckTimer.C:
processFinished, _, err := t.updateAckLevelFn()
if err == shard.ErrShardClosed || (err == nil && processFinished) {
if !t.options.EnableGracefulSyncShutdown() {
go t.Stop()
return
}
t.Stop()
return
}
updateAckTimer.Reset(backoff.JitDuration(
t.options.UpdateAckInterval(),
t.options.UpdateAckIntervalJitterCoefficient(),
))
case <-splitQueueTimer.C:
t.splitQueue()
splitQueueTimer.Reset(backoff.JitDuration(
t.options.SplitQueueInterval(),
t.options.SplitQueueIntervalJitterCoefficient(),
))
case notification := <-t.actionNotifyCh:
t.handleActionNotification(notification)
}
}
}
func (t *transferQueueProcessorBase) notifyAllQueueCollections() {
for _, queueCollection := range t.processingQueueCollections {
t.readyForProcess(queueCollection.Level())
}
}
func (t *transferQueueProcessorBase) processQueueCollections() {
for _, queueCollection := range t.processingQueueCollections {
level := queueCollection.Level()
t.processingLock.Lock()
if shouldProcess, ok := t.shouldProcess[level]; !ok || !shouldProcess {
t.processingLock.Unlock()
continue
}
t.shouldProcess[level] = false
t.processingLock.Unlock()
activeQueue := queueCollection.ActiveQueue()
if activeQueue == nil {
// process for this queue collection has finished
// it's possible that new queue will be added to this collection later though,
// pollTime will be updated after split/merge
continue
}
readLevel := activeQueue.State().ReadLevel()
maxReadLevel := minTaskKey(activeQueue.State().MaxLevel(), t.updateMaxReadLevel())
domainFilter := activeQueue.State().DomainFilter()
if !readLevel.Less(maxReadLevel) {
// no task need to be processed for now, wait for new task notification
// note that if taskID for new task is still less than readLevel, the notification
// will just be a no-op and there's no DB requests.
continue
}
ctx, cancel := context.WithTimeout(context.Background(), loadQueueTaskThrottleRetryDelay)
if err := t.rateLimiter.Wait(ctx); err != nil {
cancel()
if level != defaultProcessingQueueLevel {
t.setupBackoffTimer(level)
} else {
t.readyForProcess(level)
}
continue
}
cancel()
transferTaskInfos, more, err := t.readTasks(readLevel, maxReadLevel)
if err != nil {
t.logger.Error("Processor unable to retrieve tasks", tag.Error(err))
t.readyForProcess(level) // re-enqueue the event
continue
}
t.logger.Debug("load transfer tasks from database",
tag.ReadLevel(readLevel.(transferTaskKey).taskID),
tag.MaxLevel(maxReadLevel.(transferTaskKey).taskID),
tag.Counter(len(transferTaskInfos)))
tasks := make(map[task.Key]task.Task)
taskChFull := false
for _, taskInfo := range transferTaskInfos {
if !domainFilter.Filter(taskInfo.GetDomainID()) {
t.logger.Debug("transfer task filtered", tag.TaskID(taskInfo.GetTaskID()))
continue
}
task := t.taskInitializer(taskInfo)
tasks[newTransferTaskKey(taskInfo.GetTaskID())] = task
submitted, err := t.submitTask(task)
if err != nil {
// only err here is due to the fact that processor has been shutdown
// return instead of continue
return
}
taskChFull = taskChFull || !submitted
}
var newReadLevel task.Key
if !more {
newReadLevel = maxReadLevel
} else {
newReadLevel = newTransferTaskKey(transferTaskInfos[len(transferTaskInfos)-1].GetTaskID())
}
queueCollection.AddTasks(tasks, newReadLevel)
if t.validator != nil {
t.logger.Debug("ack transfer tasks",
tag.ReadLevel(readLevel.(transferTaskKey).taskID),
tag.MaxLevel(newReadLevel.(transferTaskKey).taskID),
tag.Counter(len(tasks)))
t.validator.ackTasks(level, readLevel, newReadLevel, tasks)
}
newActiveQueue := queueCollection.ActiveQueue()
if more || (newActiveQueue != nil && newActiveQueue != activeQueue) {
// more tasks for the current active queue or the active queue has changed
if level != defaultProcessingQueueLevel && taskChFull {
t.setupBackoffTimer(level)
} else {
t.readyForProcess(level)
}
}
// else it means we don't have tasks to process for now
// wait for new task notification
// another option for non-default queue is that we can setup a backoff timer to check back later
}
}
func (t *transferQueueProcessorBase) splitQueue() {
currentTime := t.shard.GetTimeSource().Now()
currentMaxReadLevel := t.updateMaxReadLevel().(transferTaskKey).taskID
defer func() {
t.lastSplitTime = currentTime
t.lastMaxReadLevel = currentMaxReadLevel
}()
if currentMaxReadLevel-t.lastMaxReadLevel < 2<<(t.shard.GetConfig().RangeSizeBits-1) {
// only update the estimation when rangeID is not renewed
// note the threshold here is only an estimation. If the read level increased too much
// we will drop that data point.
numTasksPerMinute := (currentMaxReadLevel - t.lastMaxReadLevel) / int64(currentTime.Sub(t.lastSplitTime).Seconds()) * int64(time.Minute.Seconds())
if t.estimatedTasksPerMinute == 0 {
// set the initial value for the estimation
t.estimatedTasksPerMinute = numTasksPerMinute
} else {
t.estimatedTasksPerMinute = int64(numTasksEstimationDecay*float64(t.estimatedTasksPerMinute) + (1-numTasksEstimationDecay)*float64(numTasksPerMinute))
}
}
if t.lastSplitTime.IsZero() || t.estimatedTasksPerMinute == 0 {
// skip the split as we can't estimate the look ahead taskID
return
}
splitPolicy := t.initializeSplitPolicy(
func(key task.Key, domainID string) task.Key {
totalLookAhead := t.estimatedTasksPerMinute * int64(t.options.SplitLookAheadDurationByDomainID(domainID).Minutes())
// ensure the above calculation doesn't overflow and cap the maximun look ahead interval
totalLookAhead = common.MaxInt64(common.MinInt64(totalLookAhead, 2<<t.shard.GetConfig().RangeSizeBits), 0)
return newTransferTaskKey(key.(transferTaskKey).taskID + totalLookAhead)
},
)
t.splitProcessingQueueCollection(splitPolicy, func(level int, _ time.Time) {
t.readyForProcess(level)
})
}
func (t *transferQueueProcessorBase) handleActionNotification(notification actionNotification) {
t.processorBase.handleActionNotification(notification, func() {
switch notification.action.ActionType {
case ActionTypeReset:
t.readyForProcess(defaultProcessingQueueLevel)
}
})
}
func (t *transferQueueProcessorBase) readTasks(
readLevel task.Key,
maxReadLevel task.Key,
) ([]*persistence.TransferTaskInfo, bool, error) {
var response *persistence.GetTransferTasksResponse
op := func() error {
var err error
response, err = t.shard.GetExecutionManager().GetTransferTasks(context.Background(), &persistence.GetTransferTasksRequest{
ReadLevel: readLevel.(transferTaskKey).taskID,
MaxReadLevel: maxReadLevel.(transferTaskKey).taskID,
BatchSize: t.options.BatchSize(),
})
return err
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
backoff.WithRetryableError(persistence.IsBackgroundTransientError),
)
err := throttleRetry.Do(context.Background(), op)
if err != nil {
return nil, false, err
}
return response.Tasks, len(response.NextPageToken) != 0, nil
}
func newTransferTaskKey(taskID int64) task.Key {
return transferTaskKey{
taskID: taskID,
}
}
func (k transferTaskKey) Less(
key task.Key,
) bool {
return k.taskID < key.(transferTaskKey).taskID
}
func newTransferQueueProcessorOptions(
config *config.Config,
isActive bool,
isFailover bool,
) *queueProcessorOptions {
options := &queueProcessorOptions{
BatchSize: config.TransferTaskBatchSize,
DeleteBatchSize: config.TransferTaskDeleteBatchSize,
MaxPollRPS: config.TransferProcessorMaxPollRPS,
MaxPollInterval: config.TransferProcessorMaxPollInterval,
MaxPollIntervalJitterCoefficient: config.TransferProcessorMaxPollIntervalJitterCoefficient,
UpdateAckInterval: config.TransferProcessorUpdateAckInterval,
UpdateAckIntervalJitterCoefficient: config.TransferProcessorUpdateAckIntervalJitterCoefficient,
RedispatchIntervalJitterCoefficient: config.TaskRedispatchIntervalJitterCoefficient,
MaxRedispatchQueueSize: config.TransferProcessorMaxRedispatchQueueSize,
SplitQueueInterval: config.TransferProcessorSplitQueueInterval,
SplitQueueIntervalJitterCoefficient: config.TransferProcessorSplitQueueIntervalJitterCoefficient,
PollBackoffInterval: config.QueueProcessorPollBackoffInterval,
PollBackoffIntervalJitterCoefficient: config.QueueProcessorPollBackoffIntervalJitterCoefficient,
EnableValidator: config.TransferProcessorEnableValidator,
ValidationInterval: config.TransferProcessorValidationInterval,
EnableGracefulSyncShutdown: config.QueueProcessorEnableGracefulSyncShutdown,
}
if isFailover {
// disable queue split for failover processor
options.EnableSplit = dynamicconfig.GetBoolPropertyFn(false)
// disable persist and load processing queue states for failover processor as it will never be split
options.EnablePersistQueueStates = dynamicconfig.GetBoolPropertyFn(false)
options.EnableLoadQueueStates = dynamicconfig.GetBoolPropertyFn(false)
options.MaxStartJitterInterval = config.TransferProcessorFailoverMaxStartJitterInterval
} else {
options.EnableSplit = config.QueueProcessorEnableSplit
options.SplitMaxLevel = config.QueueProcessorSplitMaxLevel
options.EnableRandomSplitByDomainID = config.QueueProcessorEnableRandomSplitByDomainID
options.RandomSplitProbability = config.QueueProcessorRandomSplitProbability
options.EnablePendingTaskSplitByDomainID = config.QueueProcessorEnablePendingTaskSplitByDomainID
options.PendingTaskSplitThreshold = config.QueueProcessorPendingTaskSplitThreshold
options.EnableStuckTaskSplitByDomainID = config.QueueProcessorEnableStuckTaskSplitByDomainID
options.StuckTaskSplitThreshold = config.QueueProcessorStuckTaskSplitThreshold
options.SplitLookAheadDurationByDomainID = config.QueueProcessorSplitLookAheadDurationByDomainID
options.EnablePersistQueueStates = config.QueueProcessorEnablePersistQueueStates
options.EnableLoadQueueStates = config.QueueProcessorEnableLoadQueueStates
options.MaxStartJitterInterval = dynamicconfig.GetDurationPropertyFn(0)
}
if isActive {
options.MetricScope = metrics.TransferActiveQueueProcessorScope
options.RedispatchInterval = config.ActiveTaskRedispatchInterval
} else {
options.MetricScope = metrics.TransferStandbyQueueProcessorScope
options.RedispatchInterval = config.StandbyTaskRedispatchInterval
}
return options
}