in service/history/task/timer_active_task_executor.go [113:256]
func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
t.logger.Debug("Processing user timer",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskID(task.TaskID),
)
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false
debugLog := t.logger.Debug
if t.config.EnableDebugMode && t.config.EnableTimerDebugLogByDomainID(task.DomainID) {
debugLog = t.logger.Info
}
// initialized when a timer with delay >= resurrectionCheckMinDelay
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedTimer map[string]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
defer cancel()
sortedUserTimers := timerSequence.LoadAndSortUserTimers()
debugLog("Sorted user timers",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.Counter(len(sortedUserTimers)),
)
Loop:
for _, timerSequenceID := range sortedUserTimers {
timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
if !ok {
errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
t.logger.Error(errString)
return &types.InternalServiceError{Message: errString}
}
delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
debugLog("Processing user timer sequence id",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID),
tag.Dynamic("timer-sequence-id", timerSequenceID),
tag.Dynamic("timer-info", timerInfo),
tag.Dynamic("delay", delay),
tag.Dynamic("expired", expired),
)
if !expired {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
}
if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil {
if resurrectedTimer == nil {
// overwrite the context here as scan history may take a long time to complete
// ctx will also be used by other operations like updateWorkflow
ctx = scanWorkflowCtx
resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState)
if err != nil {
t.logger.Error("Timer resurrection check failed", tag.Error(err))
return err
}
}
if _, ok := resurrectedTimer[timerInfo.TimerID]; ok {
// found timer resurrection
domainName := mutableState.GetDomainEntry().GetInfo().Name
t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter)
t.logger.Warn("Encounter resurrected timer, skip",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event
)
// remove resurrected timer from mutable state
if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil {
return err
}
updateMutableState = true
continue Loop
}
}
if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil {
return err
}
updateMutableState = true
debugLog("User timer fired",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowTimerID(timerInfo.TimerID),
tag.WorkflowScheduleID(timerInfo.StartedID),
tag.WorkflowNextEventID(mutableState.GetNextEventID()),
)
}
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState)
}