service/matching/taskListManager.go (493 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package matching
import (
"bytes"
"context"
"errors"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
const (
// Time budget for empty task to propagate through the function stack and be returned to
// pollForActivityTask or pollForDecisionTask handler.
returnEmptyTaskTimeBudget time.Duration = time.Second
)
var (
taskListActivityTypeTag = metrics.TaskListTypeTag("activity")
taskListDecisionTypeTag = metrics.TaskListTypeTag("decision")
)
type (
addTaskParams struct {
execution *types.WorkflowExecution
taskInfo *persistence.TaskInfo
source types.TaskSource
forwardedFrom string
activityTaskDispatchInfo *types.ActivityTaskDispatchInfo
}
taskListManager interface {
Start() error
Stop()
// AddTask adds a task to the task list. This method will first attempt a synchronous
// match with a poller. When that fails, task will be written to database and later
// asynchronously matched with a poller
AddTask(ctx context.Context, params addTaskParams) (syncMatch bool, err error)
// GetTask blocks waiting for a task Returns error when context deadline is exceeded
// maxDispatchPerSecond is the max rate at which tasks are allowed to be dispatched
// from this task list to pollers
GetTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error)
// DispatchTask dispatches a task to a poller. When there are no pollers to pick
// up the task, this method will return error. Task will not be persisted to db
DispatchTask(ctx context.Context, task *InternalTask) error
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
// if dispatched to local poller then nil and nil is returned.
DispatchQueryTask(ctx context.Context, taskID string, request *types.MatchingQueryWorkflowRequest) (*types.QueryWorkflowResponse, error)
CancelPoller(pollerID string)
GetAllPollerInfo() []*types.PollerInfo
HasPollerAfter(accessTime time.Time) bool
// DescribeTaskList returns information about the target tasklist
DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse
String() string
GetTaskListKind() types.TaskListKind
TaskListID() *taskListID
}
outstandingPollerInfo struct {
isolationGroup string
cancel context.CancelFunc
}
// Single task list in memory state
taskListManagerImpl struct {
createTime time.Time
enableIsolation bool
taskListID *taskListID
taskListKind types.TaskListKind // sticky taskList has different process in persistence
config *taskListConfig
db *taskListDB
taskWriter *taskWriter
taskReader *taskReader // reads tasks from db and async matches it with poller
liveness *liveness
taskGC *taskGC
taskAckManager messaging.AckManager // tracks ackLevel for delivered messages
matcher *TaskMatcher // for matching a task producer with a poller
clusterMetadata cluster.Metadata
domainCache cache.DomainCache
partitioner partition.Partitioner
logger log.Logger
scope metrics.Scope
domainName string
// pollerHistory stores poller which poll from this tasklist in last few minutes
pollerHistory *pollerHistory
// outstandingPollsMap is needed to keep track of all outstanding pollers for a
// particular tasklist. PollerID generated by frontend is used as the key and
// CancelFunc is the value. This is used to cancel the context to unblock any
// outstanding poller when the frontend detects client connection is closed to
// prevent tasks being dispatched to zombie pollers.
outstandingPollsLock sync.Mutex
outstandingPollsMap map[string]outstandingPollerInfo
startWG sync.WaitGroup // ensures that background processes do not start until setup is ready
stopped int32
closeCallback func(taskListManager)
}
)
const (
// maxSyncMatchWaitTime is the max amount of time that we are willing to wait for a sync match to happen
maxSyncMatchWaitTime = 200 * time.Millisecond
)
var _ taskListManager = (*taskListManagerImpl)(nil)
var errRemoteSyncMatchFailed = &types.RemoteSyncMatchedError{Message: "remote sync match failed"}
func newTaskListManager(
e *matchingEngineImpl,
taskList *taskListID,
taskListKind *types.TaskListKind,
config *Config,
createTime time.Time,
) (taskListManager, error) {
taskListConfig, err := newTaskListConfig(taskList, config, e.domainCache)
if err != nil {
return nil, err
}
if taskListKind == nil {
normalTaskListKind := types.TaskListKindNormal
taskListKind = &normalTaskListKind
}
domainName, err := e.domainCache.GetDomainName(taskList.domainID)
if err != nil {
return nil, err
}
scope := newPerTaskListScope(domainName, taskList.name, *taskListKind, e.metricsClient, metrics.MatchingTaskListMgrScope)
db := newTaskListDB(e.taskManager, taskList.domainID, domainName, taskList.name, taskList.taskType, int(*taskListKind), e.logger)
tlMgr := &taskListManagerImpl{
createTime: createTime,
enableIsolation: taskListConfig.EnableTasklistIsolation(),
domainCache: e.domainCache,
clusterMetadata: e.clusterMetadata,
partitioner: e.partitioner,
taskListID: taskList,
taskListKind: *taskListKind,
logger: e.logger.WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType)),
db: db,
taskAckManager: messaging.NewAckManager(e.logger),
taskGC: newTaskGC(db, taskListConfig),
config: taskListConfig,
outstandingPollsMap: make(map[string]outstandingPollerInfo),
domainName: domainName,
scope: scope,
closeCallback: e.removeTaskListManager,
}
taskListTypeMetricScope := tlMgr.scope.Tagged(
getTaskListTypeTag(taskList.taskType),
)
tlMgr.pollerHistory = newPollerHistory(func() {
taskListTypeMetricScope.UpdateGauge(metrics.PollerPerTaskListCounter,
float64(len(tlMgr.pollerHistory.getPollerInfo(time.Time{}))))
})
tlMgr.liveness = newLiveness(clock.NewRealTimeSource(), taskListConfig.IdleTasklistCheckInterval(), tlMgr.Stop)
var isolationGroups []string
if tlMgr.isIsolationMatcherEnabled() {
isolationGroups = config.AllIsolationGroups
}
var fwdr *Forwarder
if tlMgr.isFowardingAllowed(taskList, *taskListKind) {
fwdr = newForwarder(&taskListConfig.forwarderConfig, taskList, *taskListKind, e.matchingClient, isolationGroups)
}
tlMgr.matcher = newTaskMatcher(taskListConfig, fwdr, tlMgr.scope, isolationGroups, tlMgr.logger)
tlMgr.taskWriter = newTaskWriter(tlMgr)
tlMgr.taskReader = newTaskReader(tlMgr, isolationGroups)
tlMgr.startWG.Add(1)
return tlMgr, nil
}
// Starts reading pump for the given task list.
// The pump fills up taskBuffer from persistence.
func (c *taskListManagerImpl) Start() error {
defer c.startWG.Done()
c.liveness.Start()
if err := c.taskWriter.Start(); err != nil {
c.Stop()
return err
}
c.taskReader.Start()
return nil
}
// Stops pump that fills up taskBuffer from persistence.
func (c *taskListManagerImpl) Stop() {
if !atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
return
}
c.closeCallback(c)
c.liveness.Stop()
c.taskWriter.Stop()
c.taskReader.Stop()
c.logger.Info("Task list manager state changed", tag.LifeCycleStopped)
}
func (c *taskListManagerImpl) handleErr(err error) error {
var e *persistence.ConditionFailedError
if errors.As(err, &e) {
// This indicates the task list may have moved to another host.
c.scope.IncCounter(metrics.ConditionFailedErrorPerTaskListCounter)
c.logger.Debug("Stopping task list due to persistence condition failure.", tag.Error(err))
c.Stop()
if c.taskListKind == types.TaskListKindSticky {
// TODO: we don't see this error in our logs, we might be able to remove this error
err = &types.InternalServiceError{Message: common.StickyTaskConditionFailedErrorMsg}
}
}
return err
}
// AddTask adds a task to the task list. This method will first attempt a synchronous
// match with a poller. When there are no pollers or if rate limit is exceeded, task will
// be written to database and later asynchronously matched with a poller
func (c *taskListManagerImpl) AddTask(ctx context.Context, params addTaskParams) (bool, error) {
c.startWG.Wait()
if c.shouldReload() {
c.Stop()
return false, errShutdown
}
if params.forwardedFrom == "" {
// request sent by history service
c.liveness.markAlive(time.Now())
}
var syncMatch bool
_, err := c.executeWithRetry(func() (interface{}, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
domainEntry, err := c.domainCache.GetDomainByID(params.taskInfo.DomainID)
if err != nil {
return nil, err
}
isForwarded := params.forwardedFrom != ""
if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
// standby task, only persist when task is not forwarded from child partition
syncMatch = false
if isForwarded {
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}
r, err := c.taskWriter.appendTask(params.execution, params.taskInfo)
return r, err
}
isolationGroup, err := c.getIsolationGroupForTask(ctx, params.taskInfo)
if err != nil {
return false, err
}
// active task, try sync match first
syncMatch, err = c.trySyncMatch(ctx, params, isolationGroup)
if syncMatch {
return &persistence.CreateTasksResponse{}, err
}
if params.activityTaskDispatchInfo != nil {
return false, errRemoteSyncMatchFailed
}
if isForwarded {
// forwarded from child partition - only do sync match
// child partition will persist the task when sync match fails
return &persistence.CreateTasksResponse{}, errRemoteSyncMatchFailed
}
return c.taskWriter.appendTask(params.execution, params.taskInfo)
})
if err == nil && !syncMatch {
c.taskReader.Signal()
}
return syncMatch, err
}
// DispatchTask dispatches a task to a poller. When there are no pollers to pick
// up the task or if rate limit is exceeded, this method will return error. Task
// *will not* be persisted to db
func (c *taskListManagerImpl) DispatchTask(ctx context.Context, task *InternalTask) error {
return c.matcher.MustOffer(ctx, task)
}
// DispatchQueryTask will dispatch query to local or remote poller. If forwarded then result or error is returned,
// if dispatched to local poller then nil and nil is returned.
func (c *taskListManagerImpl) DispatchQueryTask(
ctx context.Context,
taskID string,
request *types.MatchingQueryWorkflowRequest,
) (*types.QueryWorkflowResponse, error) {
c.startWG.Wait()
task := newInternalQueryTask(taskID, request)
return c.matcher.OfferQuery(ctx, task)
}
// GetTask blocks waiting for a task.
// Returns error when context deadline is exceeded
// maxDispatchPerSecond is the max rate at which tasks are allowed
// to be dispatched from this task list to pollers
func (c *taskListManagerImpl) GetTask(
ctx context.Context,
maxDispatchPerSecond *float64,
) (*InternalTask, error) {
if c.shouldReload() {
c.Stop()
return nil, ErrNoTasks
}
c.liveness.markAlive(time.Now())
task, err := c.getTask(ctx, maxDispatchPerSecond)
if err != nil {
return nil, fmt.Errorf("couldn't get task: %w", err)
}
task.domainName = c.domainName
task.backlogCountHint = c.taskAckManager.GetBacklogCount()
return task, nil
}
func (c *taskListManagerImpl) getTask(ctx context.Context, maxDispatchPerSecond *float64) (*InternalTask, error) {
// We need to set a shorter timeout than the original ctx; otherwise, by the time ctx deadline is
// reached, instead of emptyTask, context timeout error is returned to the frontend by the rpc stack,
// which counts against our SLO. By shortening the timeout by a very small amount, the emptyTask can be
// returned to the handler before a context timeout error is generated.
childCtx, cancel := c.newChildContext(ctx, c.config.LongPollExpirationInterval(), returnEmptyTaskTimeBudget)
defer cancel()
isolationGroup, _ := ctx.Value(_isolationGroupKey).(string)
pollerID, ok := ctx.Value(pollerIDKey).(string)
if ok && pollerID != "" {
// Found pollerID on context, add it to the map to allow it to be canceled in
// response to CancelPoller call
c.outstandingPollsLock.Lock()
c.outstandingPollsMap[pollerID] = outstandingPollerInfo{isolationGroup: isolationGroup, cancel: cancel}
c.outstandingPollsLock.Unlock()
defer func() {
c.outstandingPollsLock.Lock()
delete(c.outstandingPollsMap, pollerID)
c.outstandingPollsLock.Unlock()
}()
}
identity, ok := ctx.Value(identityKey).(string)
if ok && identity != "" {
c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
defer func() {
// to update timestamp of this poller when long poll ends
c.pollerHistory.updatePollerInfo(pollerIdentity(identity), pollerInfo{ratePerSecond: maxDispatchPerSecond, isolationGroup: isolationGroup})
}()
}
domainEntry, err := c.domainCache.GetDomainByID(c.taskListID.domainID)
if err != nil {
return nil, fmt.Errorf("unable to fetch domain from cache: %w", err)
}
// the desired global rate limit for the task list comes from the
// poller, which lives inside the client side worker. There is
// one rateLimiter for this entire task list and as we get polls,
// we update the ratelimiter rps if it has changed from the last
// value. Last poller wins if different pollers provide different values
c.matcher.UpdateRatelimit(maxDispatchPerSecond)
if _, err := domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err != nil {
return c.matcher.PollForQuery(childCtx)
}
if c.isIsolationMatcherEnabled() {
return c.matcher.Poll(childCtx, isolationGroup)
}
return c.matcher.Poll(childCtx, "")
}
// GetAllPollerInfo returns all pollers that polled from this tasklist in last few minutes
func (c *taskListManagerImpl) GetAllPollerInfo() []*types.PollerInfo {
return c.pollerHistory.getPollerInfo(time.Time{})
}
// HasPollerAfter checks if there is any poller after a timestamp
func (c *taskListManagerImpl) HasPollerAfter(accessTime time.Time) bool {
inflightPollerCount := 0
c.outstandingPollsLock.Lock()
inflightPollerCount = len(c.outstandingPollsMap)
c.outstandingPollsLock.Unlock()
if inflightPollerCount > 0 {
return true
}
recentPollers := c.pollerHistory.getPollerInfo(accessTime)
return len(recentPollers) > 0
}
func (c *taskListManagerImpl) CancelPoller(pollerID string) {
c.outstandingPollsLock.Lock()
info, ok := c.outstandingPollsMap[pollerID]
c.outstandingPollsLock.Unlock()
if ok && info.cancel != nil {
info.cancel()
c.logger.Info("canceled outstanding poller", tag.WorkflowDomainName(c.domainName))
}
}
// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes and status of tasklist's ackManager
// (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (c *taskListManagerImpl) DescribeTaskList(includeTaskListStatus bool) *types.DescribeTaskListResponse {
response := &types.DescribeTaskListResponse{Pollers: c.GetAllPollerInfo()}
if !includeTaskListStatus {
return response
}
taskIDBlock := rangeIDToTaskIDBlock(c.db.RangeID(), c.config.RangeSize)
backlogCount, err := c.db.GetTaskListSize(c.taskAckManager.GetAckLevel())
if err != nil {
// fallback to im-memory backlog, if failed to get count from db
backlogCount = c.taskAckManager.GetBacklogCount()
}
response.TaskListStatus = &types.TaskListStatus{
ReadLevel: c.taskAckManager.GetReadLevel(),
AckLevel: c.taskAckManager.GetAckLevel(),
BacklogCountHint: backlogCount,
RatePerSecond: c.matcher.Rate(),
TaskIDBlock: &types.TaskIDBlock{
StartID: taskIDBlock.start,
EndID: taskIDBlock.end,
},
}
return response
}
func (c *taskListManagerImpl) String() string {
buf := new(bytes.Buffer)
if c.taskListID.taskType == persistence.TaskListTypeActivity {
buf.WriteString("Activity")
} else {
buf.WriteString("Decision")
}
rangeID := c.db.RangeID()
fmt.Fprintf(buf, " task list %v\n", c.taskListID.name)
fmt.Fprintf(buf, "RangeID=%v\n", rangeID)
fmt.Fprintf(buf, "TaskIDBlock=%+v\n", rangeIDToTaskIDBlock(rangeID, c.config.RangeSize))
fmt.Fprintf(buf, "AckLevel=%v\n", c.taskAckManager.GetAckLevel())
fmt.Fprintf(buf, "MaxReadLevel=%v\n", c.taskAckManager.GetReadLevel())
return buf.String()
}
func (c *taskListManagerImpl) GetTaskListKind() types.TaskListKind {
return c.taskListKind
}
func (c *taskListManagerImpl) TaskListID() *taskListID {
return c.taskListID
}
// Retry operation on transient error. On rangeID update by another process calls c.Stop().
func (c *taskListManagerImpl) executeWithRetry(
operation func() (interface{}, error),
) (result interface{}, err error) {
op := func() error {
result, err = operation()
return err
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(persistenceOperationRetryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
)
err = c.handleErr(throttleRetry.Do(context.Background(), op))
return
}
func (c *taskListManagerImpl) trySyncMatch(ctx context.Context, params addTaskParams, isolationGroup string) (bool, error) {
task := newInternalTask(params.taskInfo, nil, params.source, params.forwardedFrom, true, params.activityTaskDispatchInfo, isolationGroup)
childCtx := ctx
cancel := func() {}
waitTime := maxSyncMatchWaitTime
if params.activityTaskDispatchInfo != nil {
waitTime = c.config.ActivityTaskSyncMatchWaitTime(params.activityTaskDispatchInfo.WorkflowDomain)
}
if !task.isForwarded() {
// when task is forwarded from another matching host, we trust the context as is
// otherwise, we override to limit the amount of time we can block on sync match
childCtx, cancel = c.newChildContext(ctx, waitTime, time.Second)
}
var matched bool
var err error
if params.activityTaskDispatchInfo != nil {
matched, err = c.matcher.offerOrTimeout(childCtx, task)
} else {
matched, err = c.matcher.Offer(childCtx, task)
}
cancel()
return matched, err
}
// newChildContext creates a child context with desired timeout.
// if tailroom is non-zero, then child context timeout will be
// the minOf(parentCtx.Deadline()-tailroom, timeout). Use this
// method to create child context when childContext cannot use
// all of parent's deadline but instead there is a need to leave
// some time for parent to do some post-work
func (c *taskListManagerImpl) newChildContext(
parent context.Context,
timeout time.Duration,
tailroom time.Duration,
) (context.Context, context.CancelFunc) {
select {
case <-parent.Done():
return parent, func() {}
default:
}
deadline, ok := parent.Deadline()
if !ok {
return context.WithTimeout(parent, timeout)
}
remaining := time.Until(deadline) - tailroom
if remaining < timeout {
timeout = time.Duration(common.MaxInt64(0, int64(remaining)))
}
return context.WithTimeout(parent, timeout)
}
func (c *taskListManagerImpl) isFowardingAllowed(taskList *taskListID, kind types.TaskListKind) bool {
return !taskList.IsRoot() && kind != types.TaskListKindSticky
}
func (c *taskListManagerImpl) isIsolationMatcherEnabled() bool {
return c.taskListKind != types.TaskListKindSticky && c.enableIsolation
}
func (c *taskListManagerImpl) shouldReload() bool {
return c.config.EnableTasklistIsolation() != c.enableIsolation
}
func (c *taskListManagerImpl) getIsolationGroupForTask(ctx context.Context, taskInfo *persistence.TaskInfo) (string, error) {
if c.enableIsolation && len(taskInfo.PartitionConfig) > 0 && c.taskListKind != types.TaskListKindSticky {
partitionConfig := make(map[string]string)
for k, v := range taskInfo.PartitionConfig {
partitionConfig[k] = v
}
partitionConfig[partition.WorkflowIDKey] = taskInfo.WorkflowID
pollerIsolationGroups := c.config.AllIsolationGroups
// Not all poller information are available at the time of task list manager creation,
// because we don't persist poller information in database, so in the first minute, we always assume
// pollers are available in all isolation groups to avoid the risk of leaking a task to another isolation group.
// Besides, for sticky and scalable tasklists, not all poller information are available, we also use all isolation group.
if time.Now().Sub(c.createTime) > time.Minute && c.taskListKind != types.TaskListKindSticky && c.taskListID.IsRoot() {
pollerIsolationGroups = c.getPollerIsolationGroups()
if len(pollerIsolationGroups) == 0 {
// we don't have any pollers, use all isolation groups and wait for pollers' arriving
pollerIsolationGroups = c.config.AllIsolationGroups
}
}
group, err := c.partitioner.GetIsolationGroupByDomainID(ctx, taskInfo.DomainID, partitionConfig, pollerIsolationGroups)
if err != nil {
// For a sticky tasklist, return StickyUnavailableError to let it be added to the non-sticky tasklist.
if err == partition.ErrNoIsolationGroupsAvailable && c.taskListKind == types.TaskListKindSticky {
return "", _stickyPollerUnavailableError
}
// if we're unable to get the isolation group, log the error and fallback to no isolation
c.logger.Error("Failed to get isolation group from partition library", tag.WorkflowID(taskInfo.WorkflowID), tag.WorkflowRunID(taskInfo.RunID), tag.TaskID(taskInfo.TaskID), tag.Error(err))
return defaultTaskBufferIsolationGroup, nil
}
c.logger.Debug("get isolation group", tag.PollerGroups(pollerIsolationGroups), tag.IsolationGroup(group), tag.PartitionConfig(partitionConfig))
// For a sticky tasklist, it is possible that when an isolation group is undrained, the tasks from one workflow is reassigned
// to the isolation group undrained. If there is no poller from the isolation group, we should return StickyUnavailableError
// to let the task to be re-enqueued to the non-sticky tasklist. If there is poller, just return an empty isolation group, because
// there is at most one isolation group for sticky tasklist and we could just use empty isolation group for matching.
if c.taskListKind == types.TaskListKindSticky {
pollerIsolationGroups = c.getPollerIsolationGroups()
for _, pollerGroup := range pollerIsolationGroups {
if group == pollerGroup {
return "", nil
}
}
return "", _stickyPollerUnavailableError
}
return group, nil
}
return defaultTaskBufferIsolationGroup, nil
}
func (c *taskListManagerImpl) getPollerIsolationGroups() []string {
groupSet := c.pollerHistory.getPollerIsolationGroups(time.Now().Add(-10 * time.Second))
c.outstandingPollsLock.Lock()
for _, poller := range c.outstandingPollsMap {
groupSet[poller.isolationGroup] = struct{}{}
}
c.outstandingPollsLock.Unlock()
result := make([]string, 0, len(groupSet))
for k := range groupSet {
result = append(result, k)
}
sort.Strings(result)
return result
}
func getTaskListTypeTag(taskListType int) metrics.Tag {
switch taskListType {
case persistence.TaskListTypeActivity:
return taskListActivityTypeTag
case persistence.TaskListTypeDecision:
return taskListDecisionTypeTag
default:
return metrics.TaskListTypeTag("")
}
}
func createServiceBusyError(msg string) *types.ServiceBusyError {
return &types.ServiceBusyError{Message: msg}
}
func rangeIDToTaskIDBlock(rangeID, rangeSize int64) taskIDBlock {
return taskIDBlock{
start: (rangeID-1)*rangeSize + 1,
end: rangeID * rangeSize,
}
}