service/history/task/task.go (350 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package task
import (
"errors"
"fmt"
"sync"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
ctask "github.com/uber/cadence/common/task"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
)
// redispatchError is the error indicating that the timer / transfer task should be redispatched and retried.
type redispatchError struct {
Reason string
}
// Error explains why this task should be redispatched
func (r *redispatchError) Error() string {
return fmt.Sprintf("Redispatch reason: %q", r.Reason)
}
func isRedispatchErr(err error) bool {
var redispatchErr *redispatchError
return errors.As(err, &redispatchErr)
}
var (
// ErrTaskDiscarded is the error indicating that the timer / transfer task is pending for too long and discarded.
ErrTaskDiscarded = errors.New("passive task pending for too long")
// ErrTaskPendingActive is the error indicating that the task should be re-dispatched
ErrTaskPendingActive = errors.New("redispatch the task while the domain is pending-active")
)
type (
taskImpl struct {
sync.Mutex
Info
shard shard.Context
state ctask.State
priority int
attempt int
timeSource clock.TimeSource
submitTime time.Time
logger log.Logger
eventLogger eventLogger
scopeIdx int
scope metrics.Scope // initialized when processing task to make the initialization parallel
taskExecutor Executor
taskProcessor Processor
redispatchFn func(task Task)
criticalRetryCount dynamicconfig.IntPropertyFn
// TODO: following three fields should be removed after new task lifecycle is implemented
taskFilter Filter
queueType QueueType
shouldProcessTask bool
}
)
// NewTimerTask creates a new timer task
func NewTimerTask(
shard shard.Context,
taskInfo Info,
queueType QueueType,
logger log.Logger,
taskFilter Filter,
taskExecutor Executor,
taskProcessor Processor,
redispatchFn func(task Task),
criticalRetryCount dynamicconfig.IntPropertyFn,
) Task {
return newTask(
shard,
taskInfo,
queueType,
GetTimerTaskMetricScope(taskInfo.GetTaskType(), queueType == QueueTypeActiveTimer),
logger,
taskFilter,
taskExecutor,
taskProcessor,
criticalRetryCount,
redispatchFn,
)
}
// NewTransferTask creates a new transfer task
func NewTransferTask(
shard shard.Context,
taskInfo Info,
queueType QueueType,
logger log.Logger,
taskFilter Filter,
taskExecutor Executor,
taskProcessor Processor,
redispatchFn func(task Task),
criticalRetryCount dynamicconfig.IntPropertyFn,
) Task {
return newTask(
shard,
taskInfo,
queueType,
GetTransferTaskMetricsScope(taskInfo.GetTaskType(), queueType == QueueTypeActiveTransfer),
logger,
taskFilter,
taskExecutor,
taskProcessor,
criticalRetryCount,
redispatchFn,
)
}
func newTask(
shard shard.Context,
taskInfo Info,
queueType QueueType,
scopeIdx int,
logger log.Logger,
taskFilter Filter,
taskExecutor Executor,
taskProcessor Processor,
criticalRetryCount dynamicconfig.IntPropertyFn,
redispatchFn func(task Task),
) *taskImpl {
timeSource := shard.GetTimeSource()
var eventLogger eventLogger
if shard.GetConfig().EnableDebugMode &&
(queueType == QueueTypeActiveTimer || queueType == QueueTypeActiveTransfer) &&
shard.GetConfig().EnableTaskInfoLogByDomainID(taskInfo.GetDomainID()) {
eventLogger = newEventLogger(logger, timeSource, defaultTaskEventLoggerSize)
eventLogger.AddEvent("Created task")
}
return &taskImpl{
Info: taskInfo,
shard: shard,
state: ctask.TaskStatePending,
priority: noPriority,
queueType: queueType,
scopeIdx: scopeIdx,
scope: nil,
logger: logger,
eventLogger: eventLogger,
attempt: 0,
submitTime: timeSource.Now(),
timeSource: timeSource,
criticalRetryCount: criticalRetryCount,
redispatchFn: redispatchFn,
taskFilter: taskFilter,
taskExecutor: taskExecutor,
taskProcessor: taskProcessor,
}
}
func (t *taskImpl) Execute() error {
// TODO: after mergering active and standby queue,
// the task should be smart enough to tell if it should be
// processed as active or standby and use the corresponding
// task executor.
if t.scope == nil {
t.scope = getOrCreateDomainTaggedScope(t.shard, t.scopeIdx, t.GetDomainID(), t.logger)
}
var err error
t.shouldProcessTask, err = t.taskFilter(t.Info)
if err != nil {
logEvent(t.eventLogger, "TaskFilter execution failed", err)
time.Sleep(loadDomainEntryForTaskRetryDelay)
return err
}
executionStartTime := t.timeSource.Now()
defer func() {
if t.shouldProcessTask {
t.scope.IncCounter(metrics.TaskRequestsPerDomain)
t.scope.RecordTimer(metrics.TaskProcessingLatencyPerDomain, time.Since(executionStartTime))
}
}()
logEvent(t.eventLogger, "Executing task", t.shouldProcessTask)
return t.taskExecutor.Execute(t, t.shouldProcessTask)
}
func (t *taskImpl) HandleErr(err error) (retErr error) {
defer func() {
if retErr != nil {
logEvent(t.eventLogger, "Failed to handle error", retErr)
t.Lock()
defer t.Unlock()
t.attempt++
if t.attempt > t.criticalRetryCount() {
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.logger.Error("Critical error processing task, retrying.",
tag.Error(err),
tag.OperationCritical,
tag.TaskType(t.GetTaskType()),
tag.AttemptCount(t.attempt),
)
}
}
}()
if err == nil {
return nil
}
logEvent(t.eventLogger, "Handling task processing error", err)
if _, ok := err.(*types.EntityNotExistsError); ok {
return nil
} else if _, ok := err.(*types.WorkflowExecutionAlreadyCompletedError); ok {
return nil
}
if transferTask, ok := t.Info.(*persistence.TransferTaskInfo); ok &&
transferTask.TaskType == persistence.TransferTaskTypeCloseExecution &&
err == execution.ErrMissingWorkflowStartEvent &&
t.shard.GetConfig().EnableDropStuckTaskByDomainID(t.Info.GetDomainID()) { // use domainID here to avoid accessing domainCache
t.scope.IncCounter(metrics.TransferTaskMissingEventCounterPerDomain)
t.logger.Error("Drop close execution transfer task due to corrupted workflow history", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}
if err == errWorkflowBusy {
t.scope.IncCounter(metrics.TaskWorkflowBusyPerDomain)
return err
}
if err == errWorkflowRateLimited {
// metrics are emitted within the rate limiter
return err
}
// this is a transient error
if isRedispatchErr(err) {
t.scope.IncCounter(metrics.TaskStandbyRetryCounterPerDomain)
return err
}
// this is a transient error during graceful failover
if err == ErrTaskPendingActive {
t.scope.IncCounter(metrics.TaskPendingActiveCounterPerDomain)
return err
}
if err == ErrTaskDiscarded {
t.scope.IncCounter(metrics.TaskDiscardedPerDomain)
err = nil
}
if err == execution.ErrMissingVersionHistories {
t.logger.Error("Encounter 2DC workflow during task processing.")
t.scope.IncCounter(metrics.TaskUnsupportedPerDomain)
err = nil
}
// target domain not active error, we should retry the task
// so that a cross-cluster task can be created.
if err == errTargetDomainNotActive {
t.scope.IncCounter(metrics.TaskTargetNotActiveCounterPerDomain)
t.logger.Error("Dropping 'domain-not-active' error as non-retriable", tag.Error(err))
return nil
}
// this is a transient error, and means source domain not active
// TODO remove this error check special case
// since the new task life cycle will not give up until task processed / verified
if _, ok := err.(*types.DomainNotActiveError); ok {
if t.timeSource.Now().Sub(t.submitTime) > 2*cache.DomainCacheRefreshInterval {
t.scope.IncCounter(metrics.TaskNotActiveCounterPerDomain)
return nil
}
return err
}
t.scope.IncCounter(metrics.TaskFailuresPerDomain)
if _, ok := err.(*persistence.CurrentWorkflowConditionFailedError); ok {
t.logger.Error("More than 2 workflow are running.", tag.Error(err), tag.LifeCycleProcessingFailed)
return nil
}
if t.GetAttempt() > stickyTaskMaxRetryCount && common.IsStickyTaskConditionError(err) {
// sticky task could end up into endless loop in rare cases and
// cause worker to keep getting decision timeout unless restart.
// return nil here to break the endless loop
return nil
}
t.logger.Error("Fail to process task", tag.Error(err), tag.LifeCycleProcessingFailed)
return err
}
func (t *taskImpl) RetryErr(err error) bool {
if err == errWorkflowBusy || isRedispatchErr(err) || err == ErrTaskPendingActive || common.IsContextTimeoutError(err) {
return false
}
return true
}
func (t *taskImpl) Ack() {
logEvent(t.eventLogger, "Acked task")
t.Lock()
defer t.Unlock()
t.state = ctask.TaskStateAcked
if t.shouldProcessTask {
t.scope.RecordTimer(metrics.TaskAttemptTimerPerDomain, time.Duration(t.attempt))
t.scope.RecordTimer(metrics.TaskLatencyPerDomain, time.Since(t.submitTime))
t.scope.RecordTimer(metrics.TaskQueueLatencyPerDomain, time.Since(t.GetVisibilityTimestamp()))
}
if t.eventLogger != nil && t.shouldProcessTask && t.attempt != 0 {
// only dump events when the task should be processed and has been retried
t.eventLogger.FlushEvents("Task processing events")
}
}
func (t *taskImpl) Nack() {
logEvent(t.eventLogger, "Nacked task")
t.Lock()
t.state = ctask.TaskStateNacked
t.Unlock()
if t.shouldResubmitOnNack() {
if submitted, _ := t.taskProcessor.TrySubmit(t); submitted {
return
}
}
t.redispatchFn(t)
}
func (t *taskImpl) State() ctask.State {
t.Lock()
defer t.Unlock()
return t.state
}
func (t *taskImpl) Priority() int {
t.Lock()
defer t.Unlock()
return t.priority
}
func (t *taskImpl) SetPriority(priority int) {
t.Lock()
defer t.Unlock()
t.priority = priority
}
func (t *taskImpl) GetShard() shard.Context {
return t.shard
}
func (t *taskImpl) GetAttempt() int {
t.Lock()
defer t.Unlock()
return t.attempt
}
func (t *taskImpl) GetInfo() Info {
return t.Info
}
func (t *taskImpl) GetQueueType() QueueType {
return t.queueType
}
func (t *taskImpl) shouldResubmitOnNack() bool {
// TODO: for now only resubmit active task on Nack()
// we can also consider resubmit standby tasks that fails due to certain error types
// this may require change the Nack() interface to Nack(error)
return t.GetAttempt() < activeTaskResubmitMaxAttempts &&
(t.queueType == QueueTypeActiveTransfer || t.queueType == QueueTypeActiveTimer)
}
func logEvent(
eventLogger eventLogger,
msg string,
detail ...interface{},
) {
if eventLogger != nil {
eventLogger.AddEvent(msg, detail...)
}
}
// getOrCreateDomainTaggedScope returns cached domain-tagged metrics scope if exists
// otherwise, it creates a new domain-tagged scope, cache and return the scope
func getOrCreateDomainTaggedScope(
shard shard.Context,
scopeIdx int,
domainID string,
logger log.Logger,
) metrics.Scope {
scopeCache := shard.GetService().GetDomainMetricsScopeCache()
scope, ok := scopeCache.Get(domainID, scopeIdx)
if !ok {
domainTag, err := getDomainTagByID(shard.GetDomainCache(), domainID)
scope = shard.GetMetricsClient().Scope(scopeIdx, domainTag)
if err == nil {
scopeCache.Put(domainID, scopeIdx, scope)
} else {
logger.Error("Unable to get domainName", tag.Error(err))
}
}
return scope
}
func getDomainTagByID(
domainCache cache.DomainCache,
domainID string,
) (metrics.Tag, error) {
domainName, err := domainCache.GetDomainName(domainID)
if err != nil {
return metrics.DomainUnknownTag(), err
}
return metrics.DomainTag(domainName), nil
}