service/matching/taskReader.go (415 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package matching import ( "context" "errors" "runtime" "sync" "sync/atomic" "time" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/messaging" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" ) var epochStartTime = time.Unix(0, 0) const ( defaultTaskBufferIsolationGroup = "" // a task buffer which is not using an isolation group ) type ( taskReader struct { // taskBuffers: This is the in-memory queue of tasks for dispatch // that are enqueued for pollers to pickup. It's written to by // - getTasksPump - the primary means of loading async matching tasks // - task dispatch redirection - when a task is redirected from another isolation group taskBuffers map[string]chan *persistence.TaskInfo notifyC chan struct{} // Used as signal to notify pump of new tasks tlMgr *taskListManagerImpl taskListID *taskListID config *taskListConfig db *taskListDB taskWriter *taskWriter taskGC *taskGC taskAckManager messaging.AckManager domainCache cache.DomainCache clusterMetadata cluster.Metadata // The cancel objects are to cancel the ratelimiter Wait in dispatchBufferedTasks. The ideal // approach is to use request-scoped contexts and use a unique one for each call to Wait. However // in order to cancel it on shutdown, we need a new goroutine for each call that would wait on // the shutdown channel. To optimize on efficiency, we instead create one and tag it on the struct // so the cancel can be called directly on shutdown. cancelCtx context.Context cancelFunc context.CancelFunc stopped int64 // set to 1 if the reader is stopped or is shutting down logger log.Logger scope metrics.Scope throttleRetry *backoff.ThrottleRetry handleErr func(error) error onFatalErr func() dispatchTask func(context.Context, *InternalTask) error getIsolationGroupForTask func(context.Context, *persistence.TaskInfo) (string, error) ratePerSecond func() float64 // stopWg is used to wait for all dispatchers to stop. stopWg sync.WaitGroup } ) func newTaskReader(tlMgr *taskListManagerImpl, isolationGroups []string) *taskReader { ctx, cancel := context.WithCancel(context.Background()) taskBuffers := make(map[string]chan *persistence.TaskInfo) taskBuffers[defaultTaskBufferIsolationGroup] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1) for _, g := range isolationGroups { taskBuffers[g] = make(chan *persistence.TaskInfo, tlMgr.config.GetTasksBatchSize()-1) } return &taskReader{ tlMgr: tlMgr, taskListID: tlMgr.taskListID, config: tlMgr.config, db: tlMgr.db, taskWriter: tlMgr.taskWriter, taskGC: tlMgr.taskGC, taskAckManager: tlMgr.taskAckManager, cancelCtx: ctx, cancelFunc: cancel, notifyC: make(chan struct{}, 1), // we always dequeue the head of the buffer and try to dispatch it to a poller // so allocate one less than desired target buffer size taskBuffers: taskBuffers, domainCache: tlMgr.domainCache, clusterMetadata: tlMgr.clusterMetadata, logger: tlMgr.logger, scope: tlMgr.scope, handleErr: tlMgr.handleErr, onFatalErr: tlMgr.Stop, dispatchTask: tlMgr.DispatchTask, getIsolationGroupForTask: tlMgr.getIsolationGroupForTask, ratePerSecond: tlMgr.matcher.Rate, throttleRetry: backoff.NewThrottleRetry( backoff.WithRetryPolicy(persistenceOperationRetryPolicy), backoff.WithRetryableError(persistence.IsTransientError), ), } } func (tr *taskReader) Start() { tr.Signal() for g := range tr.taskBuffers { g := g tr.stopWg.Add(1) go func() { defer tr.stopWg.Done() tr.dispatchBufferedTasks(g) }() } tr.stopWg.Add(1) go func() { defer tr.stopWg.Done() tr.getTasksPump() }() } func (tr *taskReader) Stop() { if atomic.CompareAndSwapInt64(&tr.stopped, 0, 1) { tr.cancelFunc() if err := tr.persistAckLevel(); err != nil { tr.logger.Error("Persistent store operation failure", tag.StoreOperationUpdateTaskList, tag.Error(err)) } tr.taskGC.RunNow(tr.taskAckManager.GetAckLevel()) tr.stopWg.Wait() } } func (tr *taskReader) Signal() { var event struct{} select { case tr.notifyC <- event: default: // channel already has an event, don't block } } func (tr *taskReader) dispatchBufferedTasks(isolationGroup string) { dispatchLoop: for { select { case taskInfo, ok := <-tr.taskBuffers[isolationGroup]: if !ok { // Task list getTasks pump is shutdown break dispatchLoop } breakDispatchLoop := tr.dispatchSingleTaskFromBufferWithRetries(isolationGroup, taskInfo) if breakDispatchLoop { // shutting down break dispatchLoop } case <-tr.cancelCtx.Done(): break dispatchLoop } } } func (tr *taskReader) getTasksPump() { updateAckTimer := time.NewTimer(tr.config.UpdateAckInterval()) defer updateAckTimer.Stop() getTasksPumpLoop: for { select { case <-tr.cancelCtx.Done(): break getTasksPumpLoop case <-tr.notifyC: { tasks, readLevel, isReadBatchDone, err := tr.getTaskBatch() if err != nil { tr.Signal() // re-enqueue the event // TODO: Should we ever stop retrying on db errors? continue getTasksPumpLoop } if len(tasks) == 0 { tr.taskAckManager.SetReadLevel(readLevel) if !isReadBatchDone { tr.Signal() } continue getTasksPumpLoop } if !tr.addTasksToBuffer(tasks) { break getTasksPumpLoop } // There maybe more tasks. We yield now, but signal pump to check again later. tr.Signal() } case <-updateAckTimer.C: { ackLevel := tr.taskAckManager.GetAckLevel() if size, err := tr.db.GetTaskListSize(ackLevel); err == nil { tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)). UpdateGauge(metrics.TaskCountPerTaskListGauge, float64(size)) } if err := tr.handleErr(tr.persistAckLevel()); err != nil { tr.logger.Error("Persistent store operation failure", tag.StoreOperationUpdateTaskList, tag.Error(err)) // keep going as saving ack is not critical } tr.Signal() // periodically signal pump to check persistence for tasks updateAckTimer = time.NewTimer(tr.config.UpdateAckInterval()) } } scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)) scope.UpdateGauge(metrics.TaskBacklogPerTaskListGauge, float64(tr.taskAckManager.GetBacklogCount())) } } func (tr *taskReader) getTaskBatchWithRange(readLevel int64, maxReadLevel int64) ([]*persistence.TaskInfo, error) { var response *persistence.GetTasksResponse op := func() (err error) { response, err = tr.db.GetTasks(readLevel, maxReadLevel, tr.config.GetTasksBatchSize()) return } err := tr.throttleRetry.Do(context.Background(), op) if err != nil { tr.logger.Error("Persistent store operation failure", tag.StoreOperationGetTasks, tag.Error(err), tag.WorkflowTaskListName(tr.taskListID.name), tag.WorkflowTaskListType(tr.taskListID.taskType)) return nil, err } return response.Tasks, nil } // Returns a batch of tasks from persistence starting form current read level. // Also return a number that can be used to update readLevel // Also return a bool to indicate whether read is finished func (tr *taskReader) getTaskBatch() ([]*persistence.TaskInfo, int64, bool, error) { var tasks []*persistence.TaskInfo readLevel := tr.taskAckManager.GetReadLevel() maxReadLevel := tr.taskWriter.GetMaxReadLevel() // counter i is used to break and let caller check whether tasklist is still alive and need resume read. for i := 0; i < 10 && readLevel < maxReadLevel; i++ { upper := readLevel + tr.config.RangeSize if upper > maxReadLevel { upper = maxReadLevel } tasks, err := tr.getTaskBatchWithRange(readLevel, upper) if err != nil { return nil, readLevel, true, err } // return as long as it grabs any tasks if len(tasks) > 0 { return tasks, upper, true, nil } readLevel = upper } return tasks, readLevel, readLevel == maxReadLevel, nil // caller will update readLevel when no task grabbed } func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo, now time.Time) bool { return t.Expiry.After(epochStartTime) && time.Now().After(t.Expiry) } func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool { now := time.Now() for _, t := range tasks { if tr.isTaskExpired(t, now) { tr.scope.IncCounter(metrics.ExpiredTasksPerTaskListCounter) // Also increment readLevel for expired tasks otherwise it could result in // looping over the same tasks if all tasks read in the batch are expired tr.taskAckManager.SetReadLevel(t.TaskID) continue } if !tr.addSingleTaskToBuffer(t) { return false // we are shutting down the task list } } return true } func (tr *taskReader) addSingleTaskToBuffer(task *persistence.TaskInfo) bool { err := tr.taskAckManager.ReadItem(task.TaskID) if err != nil { tr.logger.Fatal("critical bug when adding item to ackManager", tag.Error(err)) } isolationGroup, err := tr.getIsolationGroupForTask(tr.cancelCtx, task) if err != nil { // it only errors when the tasklist is a sticky tasklist and // the sticky pollers are not available, in this case, we just complete the task // and let the decision get timed out and rescheduled to non-sticky tasklist if err == _stickyPollerUnavailableError { tr.completeTask(task, nil) } else { // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err)) tr.completeTask(task, err) } return true } select { case tr.taskBuffers[isolationGroup] <- task: return true case <-tr.cancelCtx.Done(): return false } } func (tr *taskReader) persistAckLevel() error { ackLevel := tr.taskAckManager.GetAckLevel() if ackLevel >= 0 { maxReadLevel := tr.taskWriter.GetMaxReadLevel() scope := tr.scope.Tagged(getTaskListTypeTag(tr.taskListID.taskType)) // note: this metrics is only an estimation for the lag. taskID in DB may not be continuous, // especially when task list ownership changes. scope.UpdateGauge(metrics.TaskLagPerTaskListGauge, float64(maxReadLevel-ackLevel)) return tr.db.UpdateState(ackLevel) } return nil } // completeTask marks a task as processed. Only tasks created by taskReader (i.e. backlog from db) reach // here. As part of completion: // - task is deleted from the database when err is nil // - new task is created and current task is deleted when err is not nil func (tr *taskReader) completeTask(task *persistence.TaskInfo, err error) { if err != nil { // failed to start the task. // We cannot just remove it from persistence because then it will be lost. // We handle this by writing the task back to persistence with a higher taskID. // This will allow subsequent tasks to make progress, and hopefully by the time this task is picked-up // again the underlying reason for failing to start will be resolved. // Note that RecordTaskStarted only fails after retrying for a long time, so a single task will not be // re-written to persistence frequently. op := func() error { wf := &types.WorkflowExecution{WorkflowID: task.WorkflowID, RunID: task.RunID} _, err := tr.taskWriter.appendTask(wf, task) return err } err = tr.throttleRetry.Do(context.Background(), op) if err != nil { // OK, we also failed to write to persistence. // This should only happen in very extreme cases where persistence is completely down. // We still can't lose the old task so we just unload the entire task list tr.logger.Error("Failed to complete task", tag.Error(err)) tr.onFatalErr() return } tr.Signal() } ackLevel := tr.taskAckManager.AckItem(task.TaskID) tr.taskGC.Run(ackLevel) } func (tr *taskReader) newDispatchContext(isolationGroup string) (context.Context, context.CancelFunc) { rps := tr.ratePerSecond() if isolationGroup != "" || rps > 1e-7 { // 1e-7 is a random number chosen to avoid overflow, normally user don't set such a low rps // this is the minimum timeout required to dispatch a task, if the timeout value is smaller than this // async task dispatch can be completely throttled, which could happen when ratePerSecond is pretty low minTimeout := time.Duration(float64(len(tr.taskBuffers))/rps) * time.Second timeout := tr.config.AsyncTaskDispatchTimeout() if timeout < minTimeout { timeout = minTimeout } domainEntry, err := tr.domainCache.GetDomainByID(tr.taskListID.domainID) if err != nil { // we don't know if the domain is active in the current cluster, assume it is active and set the timeout return context.WithTimeout(tr.cancelCtx, timeout) } if _, err := domainEntry.IsActiveIn(tr.clusterMetadata.GetCurrentClusterName()); err == nil { // if the domain is active in the current cluster, set the timeout return context.WithTimeout(tr.cancelCtx, timeout) } } return tr.cancelCtx, func() {} } func (tr *taskReader) dispatchSingleTaskFromBufferWithRetries(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool) { // retry loop for dispatching a single task for { breakDispatchLoop, breakRetryLoop := tr.dispatchSingleTaskFromBuffer(isolationGroup, taskInfo) if breakRetryLoop { return breakDispatchLoop } } } func (tr *taskReader) dispatchSingleTaskFromBuffer(isolationGroup string, taskInfo *persistence.TaskInfo) (breakDispatchLoop bool, breakRetries bool) { task := newInternalTask(taskInfo, tr.completeTask, types.TaskSourceDbBacklog, "", false, nil, isolationGroup) dispatchCtx, cancel := tr.newDispatchContext(isolationGroup) timerScope := tr.scope.StartTimer(metrics.AsyncMatchLatencyPerTaskList) err := tr.dispatchTask(dispatchCtx, task) timerScope.Stop() cancel() if err == nil { return false, true } if errors.Is(err, context.Canceled) { tr.logger.Info("Tasklist manager context is cancelled, shutting down") return true, true } if errors.Is(err, context.DeadlineExceeded) { // it only happens when isolation is enabled and there is no pollers from the given isolation group // if this happens, we don't want to block the task dispatching, because there might be pollers from // other isolation groups, we just simply continue and dispatch the task to a new isolation group which // has pollers tr.logger.Warn("Async task dispatch timed out", tag.IsolationGroup(isolationGroup), tag.WorkflowRunID(taskInfo.RunID), tag.WorkflowID(taskInfo.WorkflowID), tag.TaskID(taskInfo.TaskID), tag.Error(err), tag.WorkflowDomainID(taskInfo.DomainID), ) tr.scope.IncCounter(metrics.AsyncMatchDispatchTimeoutCounterPerTaskList) // the idea here is that by re-fetching the isolation-groups, if something has shifted // it will get a new isolation group to be placed. If it needs re-routing, then // this will be the new routing destination. group, err := tr.getIsolationGroupForTask(tr.cancelCtx, taskInfo) if err != nil { // it only errors when the tasklist is a sticky tasklist and // the sticky pollers are not available, in this case, we just complete the task // and let the decision get timed out and rescheduled to non-sticky tasklist if err == _stickyPollerUnavailableError { tr.completeTask(taskInfo, nil) return false, true } // it should never happen, unless there is a bug in 'getIsolationGroupForTask' method tr.logger.Error("taskReader: unexpected error getting isolation group", tag.Error(err), tag.IsolationGroup(group)) tr.completeTask(taskInfo, err) return false, true } if group == isolationGroup { // no change, retry to dispatch the task again return false, false } // ensure the isolation group is configured and available _, taskGroupReaderIsPresent := tr.taskBuffers[group] if !taskGroupReaderIsPresent { // there's a programmatic error. Something has gone wrong with tasklist instantiation // don't block and redirect to the default group tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter) tr.logger.Error("An isolation group buffer was misconfigured and couldn't be found. Redirecting to default", tag.Dynamic("redirection-from-isolation-group", isolationGroup), tag.Dynamic("redirection-to-isolation-group", group), tag.IsolationGroup(group), tag.WorkflowRunID(taskInfo.RunID), tag.WorkflowID(taskInfo.WorkflowID), tag.TaskID(taskInfo.TaskID), tag.WorkflowDomainID(taskInfo.DomainID), ) select { case <-tr.cancelCtx.Done(): // the task reader is shutting down return true, true case tr.taskBuffers[defaultTaskBufferIsolationGroup] <- taskInfo: // task successfully rerouted to default tasklist return false, true default: // couldn't redirect, loop and try again return false, false } } // if there is no poller in the isolation group or the isolation group is drained, // we want to redistribute the tasks to other isolation groups in this case to drain // the backlog. select { case <-tr.cancelCtx.Done(): // the task reader is shutting down return true, true case tr.taskBuffers[group] <- taskInfo: // successful redirect tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectCounter) tr.logger.Warn("some tasks were redirected to another isolation group.", tag.Dynamic("redirection-from-isolation-group", isolationGroup), tag.Dynamic("redirection-to-isolation-group", group), tag.WorkflowRunID(taskInfo.RunID), tag.WorkflowID(taskInfo.WorkflowID), tag.TaskID(taskInfo.TaskID), tag.WorkflowDomainID(taskInfo.DomainID), ) return false, true default: tr.scope.IncCounter(metrics.BufferIsolationGroupRedirectFailureCounter) tr.logger.Error("some tasks could not be redirected to another isolation group as the buffer's already full", tag.WorkflowRunID(taskInfo.RunID), tag.Dynamic("redirection-from-isolation-group", isolationGroup), tag.Dynamic("redirection-to-isolation-group", group), tag.WorkflowID(taskInfo.WorkflowID), tag.TaskID(taskInfo.TaskID), tag.WorkflowDomainID(taskInfo.DomainID), ) // the task async buffers on the other isolation-group are already full, wait and retry return false, false } } if errors.Is(err, ErrTasklistThrottled) { tr.scope.IncCounter(metrics.BufferThrottlePerTaskListCounter) runtime.Gosched() return false, false } tr.scope.IncCounter(metrics.BufferUnknownTaskDispatchError) tr.logger.Error("unknown error while dispatching task", tag.Error(err), tag.IsolationGroup(isolationGroup), tag.WorkflowRunID(taskInfo.RunID), tag.WorkflowID(taskInfo.WorkflowID), tag.TaskID(taskInfo.TaskID), tag.WorkflowDomainID(taskInfo.DomainID), ) return false, false }