service/history/task/timer_active_task_executor.go (684 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package task
import (
"context"
"fmt"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/worker/archiver"
)
const (
scanWorkflowTimeout = 30 * time.Second
)
var (
normalDecisionTypeTag = metrics.DecisionTypeTag("normal")
stickyDecisionTypeTag = metrics.DecisionTypeTag("sticky")
)
type (
timerActiveTaskExecutor struct {
*timerTaskExecutorBase
}
)
// NewTimerActiveTaskExecutor creates a new task executor for active timer task
func NewTimerActiveTaskExecutor(
shard shard.Context,
archiverClient archiver.Client,
executionCache *execution.Cache,
logger log.Logger,
metricsClient metrics.Client,
config *config.Config,
) Executor {
return &timerActiveTaskExecutor{
timerTaskExecutorBase: newTimerTaskExecutorBase(
shard,
archiverClient,
executionCache,
logger,
metricsClient,
config,
),
}
}
func (t *timerActiveTaskExecutor) Execute(
task Task,
shouldProcessTask bool,
) error {
timerTask, ok := task.GetInfo().(*persistence.TimerTaskInfo)
if !ok {
return errUnexpectedTask
}
if !shouldProcessTask {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), taskDefaultTimeout)
defer cancel()
switch timerTask.TaskType {
case persistence.TaskTypeUserTimer:
return t.executeUserTimerTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityTimeout:
return t.executeActivityTimeoutTask(ctx, timerTask)
case persistence.TaskTypeDecisionTimeout:
return t.executeDecisionTimeoutTask(ctx, timerTask)
case persistence.TaskTypeWorkflowTimeout:
return t.executeWorkflowTimeoutTask(ctx, timerTask)
case persistence.TaskTypeActivityRetryTimer:
return t.executeActivityRetryTimerTask(ctx, timerTask)
case persistence.TaskTypeWorkflowBackoffTimer:
return t.executeWorkflowBackoffTimerTask(ctx, timerTask)
case persistence.TaskTypeDeleteHistoryEvent:
return t.executeDeleteHistoryEventTask(ctx, timerTask)
default:
return errUnknownTimerTask
}
}
func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
t.logger.Debug("Processing user timer",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskID(task.TaskID),
)
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false
debugLog := t.logger.Debug
if t.config.EnableDebugMode && t.config.EnableTimerDebugLogByDomainID(task.DomainID) {
debugLog = t.logger.Info
}
// initialized when a timer with delay >= resurrectionCheckMinDelay
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedTimer map[string]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
defer cancel()
sortedUserTimers := timerSequence.LoadAndSortUserTimers()
debugLog("Sorted user timers",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.Counter(len(sortedUserTimers)),
)
Loop:
for _, timerSequenceID := range sortedUserTimers {
timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
if !ok {
errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
t.logger.Error(errString)
return &types.InternalServiceError{Message: errString}
}
delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
debugLog("Processing user timer sequence id",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID),
tag.Dynamic("timer-sequence-id", timerSequenceID),
tag.Dynamic("timer-info", timerInfo),
tag.Dynamic("delay", delay),
tag.Dynamic("expired", expired),
)
if !expired {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
}
if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil {
if resurrectedTimer == nil {
// overwrite the context here as scan history may take a long time to complete
// ctx will also be used by other operations like updateWorkflow
ctx = scanWorkflowCtx
resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState)
if err != nil {
t.logger.Error("Timer resurrection check failed", tag.Error(err))
return err
}
}
if _, ok := resurrectedTimer[timerInfo.TimerID]; ok {
// found timer resurrection
domainName := mutableState.GetDomainEntry().GetInfo().Name
t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter)
t.logger.Warn("Encounter resurrected timer, skip",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event
)
// remove resurrected timer from mutable state
if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil {
return err
}
updateMutableState = true
continue Loop
}
}
if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil {
return err
}
updateMutableState = true
debugLog("User timer fired",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID),
tag.WorkflowNextEventID(mutableState.GetNextEventID()),
)
}
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState)
}
func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
if err != nil {
return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
}
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
wfType := mutableState.GetWorkflowType()
if wfType == nil {
return fmt.Errorf("unable to find workflow type, task %s", task)
}
timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false
scheduleDecision := false
// initialized when an activity timer with delay >= resurrectionCheckMinDelay
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedActivity map[int64]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
defer cancel()
// need to clear activity heartbeat timer task mask for new activity timer task creation
// NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
// one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == int(types.TimeoutTypeHeartbeat)
activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= task.VisibilityTimestamp.Unix() {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
return err
}
updateMutableState = true
}
Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
activityInfo, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
if !ok || timerSequenceID.Attempt < activityInfo.Attempt {
// handle 2 cases:
// 1. !ok
// this case can happen since each activity can have 4 timers
// and one of those 4 timers may have fired in this loop
// 2. timerSequenceID.attempt < activityInfo.Attempt
// retry could update activity attempt, should not timeouts new attempt
// 3. it's a resurrected activity and has already been deleted in this loop
continue Loop
}
delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
if !expired {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
}
if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil {
if resurrectedActivity == nil {
// overwrite the context here as scan history may take a long time to complete
// ctx will also be used by other operations like updateWorkflow
ctx = scanWorkflowCtx
resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState)
if err != nil {
t.logger.Error("Activity resurrection check failed", tag.Error(err))
return err
}
}
if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok {
// found activity resurrection
domainName := mutableState.GetDomainEntry().GetInfo().Name
t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter)
t.logger.Warn("Encounter resurrected activity, skip",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowActivityID(activityInfo.ActivityID),
tag.WorkflowScheduleID(activityInfo.ScheduleID),
)
// remove resurrected activity from mutable state
if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil {
return err
}
updateMutableState = true
continue Loop
}
}
// check if it's possible that the timeout is due to activity task lost
if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart {
domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID)
if err == nil && activityInfo.ScheduleToStartTimeout >= int32(t.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds()) {
// note that we ignore the race condition for the dynamic config value change here as it's only for metric and logging purpose.
// theoratically the check only applies to activities with retry policy
// however for activities without retry policy, we also want to check the potential task lost and emit the metric
// so reuse the same config value as a threshold so that the metric only got emitted if the activity has been started after a long time.
t.metricsClient.Scope(metrics.TimerActiveTaskActivityTimeoutScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityLostCounter)
t.logger.Warn("Potentially activity task lost",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.WorkflowScheduleID(activityInfo.ScheduleID),
)
}
}
if ok, err := mutableState.RetryActivity(
activityInfo,
execution.TimerTypeToReason(timerSequenceID.TimerType),
nil,
); err != nil {
return err
} else if ok {
updateMutableState = true
continue Loop
}
t.emitTimeoutMetricScopeWithDomainTag(
mutableState.GetExecutionInfo().DomainID,
metrics.TimerActiveTaskActivityTimeoutScope,
timerSequenceID.TimerType,
metrics.WorkflowTypeTag(wfType.GetName()),
)
t.logger.Info("Activity timed out",
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(task.GetDomainID()),
tag.WorkflowID(task.GetWorkflowID()),
tag.WorkflowRunID(task.GetRunID()),
tag.ScheduleAttempt(task.ScheduleAttempt),
tag.FailoverVersion(task.GetVersion()),
)
if _, err := mutableState.AddActivityTaskTimedOutEvent(
activityInfo.ScheduleID,
activityInfo.StartedID,
execution.TimerTypeToInternal(timerSequenceID.TimerType),
activityInfo.Details,
); err != nil {
return err
}
updateMutableState = true
scheduleDecision = true
}
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}
func (t *timerActiveTaskExecutor) executeDecisionTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
if err != nil {
return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
}
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
wfType := mutableState.GetWorkflowType()
if wfType == nil {
return fmt.Errorf("unable to find workflow type, task %s", task)
}
scheduleID := task.EventID
decision, ok := mutableState.GetDecisionInfo(scheduleID)
if !ok {
t.logger.Debug("Potentially duplicate", tag.TaskID(task.TaskID), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TaskTypeDecisionTimeout))
return nil
}
ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, decision.Version, task.Version, task)
if err != nil || !ok {
return err
}
if decision.Attempt != task.ScheduleAttempt {
return nil
}
scheduleDecision := false
isStickyDecision := mutableState.GetExecutionInfo().StickyTaskList != ""
decisionTypeTag := normalDecisionTypeTag
if isStickyDecision {
decisionTypeTag = stickyDecisionTypeTag
}
tags := []metrics.Tag{metrics.WorkflowTypeTag(wfType.GetName()), decisionTypeTag}
switch execution.TimerTypeFromInternal(types.TimeoutType(task.TimeoutType)) {
case execution.TimerTypeStartToClose:
t.emitTimeoutMetricScopeWithDomainTag(
mutableState.GetExecutionInfo().DomainID,
metrics.TimerActiveTaskDecisionTimeoutScope,
execution.TimerTypeStartToClose,
tags...,
)
if _, err := mutableState.AddDecisionTaskTimedOutEvent(
decision.ScheduleID,
decision.StartedID,
); err != nil {
return err
}
scheduleDecision = true
case execution.TimerTypeScheduleToStart:
if decision.StartedID != common.EmptyEventID {
// decision has already started
return nil
}
if !isStickyDecision {
t.logger.Warn("Potential lost normal decision task",
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(task.GetDomainID()),
tag.WorkflowID(task.GetWorkflowID()),
tag.WorkflowRunID(task.GetRunID()),
tag.WorkflowScheduleID(scheduleID),
tag.ScheduleAttempt(task.ScheduleAttempt),
tag.FailoverVersion(task.GetVersion()),
)
}
t.emitTimeoutMetricScopeWithDomainTag(
mutableState.GetExecutionInfo().DomainID,
metrics.TimerActiveTaskDecisionTimeoutScope,
execution.TimerTypeScheduleToStart,
tags...,
)
_, err := mutableState.AddDecisionTaskScheduleToStartTimeoutEvent(scheduleID)
if err != nil {
return err
}
scheduleDecision = true
}
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}
func (t *timerActiveTaskExecutor) executeWorkflowBackoffTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
if task.TimeoutType == persistence.WorkflowBackoffTimeoutTypeRetry {
t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowRetryBackoffTimerCount)
} else {
t.metricsClient.IncCounter(metrics.TimerActiveTaskWorkflowBackoffTimerScope, metrics.WorkflowCronBackoffTimerCount)
}
if mutableState.HasProcessedOrPendingDecision() {
// already has decision task
return nil
}
// schedule first decision task
return t.updateWorkflowExecution(ctx, wfContext, mutableState, true)
}
func (t *timerActiveTaskExecutor) executeActivityRetryTimerTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
// generate activity task
scheduledID := task.EventID
activityInfo, ok := mutableState.GetActivityInfo(scheduledID)
if !ok || task.ScheduleAttempt < int64(activityInfo.Attempt) || activityInfo.StartedID != common.EmptyEventID {
if ok {
t.logger.Info("Duplicate activity retry timer task",
tag.WorkflowID(mutableState.GetExecutionInfo().WorkflowID),
tag.WorkflowRunID(mutableState.GetExecutionInfo().RunID),
tag.WorkflowDomainID(mutableState.GetExecutionInfo().DomainID),
tag.WorkflowScheduleID(activityInfo.ScheduleID),
tag.Attempt(activityInfo.Attempt),
tag.FailoverVersion(activityInfo.Version),
tag.TimerTaskStatus(activityInfo.TimerTaskStatus),
tag.ScheduleAttempt(task.ScheduleAttempt))
}
return nil
}
ok, err = verifyTaskVersion(t.shard, t.logger, task.DomainID, activityInfo.Version, task.Version, task)
if err != nil || !ok {
return err
}
domainID := task.DomainID
targetDomainID := domainID
if activityInfo.DomainID != "" {
targetDomainID = activityInfo.DomainID
} else {
// TODO remove this block after Mar, 1th, 2020
// previously, DomainID in activity info is not used, so need to get
// schedule event from DB checking whether activity to be scheduled
// belongs to this domain
scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduledID)
if err != nil {
return err
}
if scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain() != "" {
domainEntry, err := t.shard.GetDomainCache().GetDomain(scheduledEvent.ActivityTaskScheduledEventAttributes.GetDomain())
if err != nil {
return &types.InternalServiceError{Message: "unable to re-schedule activity across domain."}
}
targetDomainID = domainEntry.GetInfo().ID
}
}
execution := types.WorkflowExecution{
WorkflowID: task.WorkflowID,
RunID: task.RunID}
taskList := &types.TaskList{
Name: activityInfo.TaskList,
}
scheduleToStartTimeout := activityInfo.ScheduleToStartTimeout
release(nil) // release earlier as we don't need the lock anymore
return t.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &types.AddActivityTaskRequest{
DomainUUID: targetDomainID,
SourceDomainUUID: domainID,
Execution: &execution,
TaskList: taskList,
ScheduleID: scheduledID,
ScheduleToStartTimeoutSeconds: common.Int32Ptr(scheduleToStartTimeout),
PartitionConfig: mutableState.GetExecutionInfo().PartitionConfig,
})
}
func (t *timerActiveTaskExecutor) executeWorkflowTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
startVersion, err := mutableState.GetStartVersion()
if err != nil {
return err
}
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, startVersion, task.Version, task)
if err != nil || !ok {
return err
}
eventBatchFirstEventID := mutableState.GetNextEventID()
timeoutReason := execution.TimerTypeToReason(execution.TimerTypeStartToClose)
backoffInterval := mutableState.GetRetryBackoffDuration(timeoutReason)
continueAsNewInitiator := types.ContinueAsNewInitiatorRetryPolicy
if backoffInterval == backoff.NoBackoff {
// check if a cron backoff is needed
backoffInterval, err = mutableState.GetCronBackoffDuration(ctx)
if err != nil {
return err
}
continueAsNewInitiator = types.ContinueAsNewInitiatorCronSchedule
}
// ignore event id
isCanceled, _ := mutableState.IsCancelRequested()
if isCanceled || backoffInterval == backoff.NoBackoff {
if err := timeoutWorkflow(mutableState, eventBatchFirstEventID); err != nil {
return err
}
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict than reload
// the history and try the operation again.
return t.updateWorkflowExecution(ctx, wfContext, mutableState, false)
}
// workflow timeout, but a retry or cron is needed, so we do continue as new to retry or cron
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
startAttributes := startEvent.WorkflowExecutionStartedEventAttributes
continueAsNewAttributes := &types.ContinueAsNewWorkflowExecutionDecisionAttributes{
WorkflowType: startAttributes.WorkflowType,
TaskList: startAttributes.TaskList,
Input: startAttributes.Input,
ExecutionStartToCloseTimeoutSeconds: startAttributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: startAttributes.TaskStartToCloseTimeoutSeconds,
BackoffStartIntervalInSeconds: common.Int32Ptr(int32(backoffInterval.Seconds())),
RetryPolicy: startAttributes.RetryPolicy,
Initiator: continueAsNewInitiator.Ptr(),
FailureReason: common.StringPtr(timeoutReason),
CronSchedule: mutableState.GetExecutionInfo().CronSchedule,
Header: startAttributes.Header,
Memo: startAttributes.Memo,
SearchAttributes: startAttributes.SearchAttributes,
JitterStartSeconds: startAttributes.JitterStartSeconds,
}
newMutableState, err := retryWorkflow(
ctx,
mutableState,
eventBatchFirstEventID,
startAttributes.GetParentWorkflowDomain(),
continueAsNewAttributes,
)
if err != nil {
return err
}
newExecutionInfo := newMutableState.GetExecutionInfo()
return wfContext.UpdateWorkflowExecutionWithNewAsActive(
ctx,
t.shard.GetTimeSource().Now(),
execution.NewContext(
newExecutionInfo.DomainID,
types.WorkflowExecution{
WorkflowID: newExecutionInfo.WorkflowID,
RunID: newExecutionInfo.RunID,
},
t.shard,
t.shard.GetExecutionManager(),
t.logger,
),
newMutableState,
)
}
func (t *timerActiveTaskExecutor) updateWorkflowExecution(
ctx context.Context,
wfContext execution.Context,
mutableState execution.MutableState,
scheduleNewDecision bool,
) error {
var err error
if scheduleNewDecision {
// Schedule a new decision.
err = execution.ScheduleDecision(mutableState)
if err != nil {
return err
}
}
now := t.shard.GetTimeSource().Now()
err = wfContext.UpdateWorkflowExecutionAsActive(ctx, now)
if err != nil {
// if is shard ownership error, the shard context will stop the entire history engine
// we don't need to explicitly stop the queue processor here
return err
}
return nil
}
func (t *timerActiveTaskExecutor) emitTimeoutMetricScopeWithDomainTag(
domainID string,
scope int,
timerType execution.TimerType,
tags ...metrics.Tag,
) {
domainTag, err := getDomainTagByID(t.shard.GetDomainCache(), domainID)
if err != nil {
return
}
tags = append(tags, domainTag)
metricsScope := t.metricsClient.Scope(scope, tags...)
switch timerType {
case execution.TimerTypeScheduleToStart:
metricsScope.IncCounter(metrics.ScheduleToStartTimeoutCounter)
case execution.TimerTypeScheduleToClose:
metricsScope.IncCounter(metrics.ScheduleToCloseTimeoutCounter)
case execution.TimerTypeStartToClose:
metricsScope.IncCounter(metrics.StartToCloseTimeoutCounter)
case execution.TimerTypeHeartbeat:
metricsScope.IncCounter(metrics.HeartbeatTimeoutCounter)
}
}