in service/history/task/timer_active_task_executor.go [258:439]
func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
ctx context.Context,
task *persistence.TimerTaskInfo,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
if err != nil {
return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
}
mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
return nil
}
wfType := mutableState.GetWorkflowType()
if wfType == nil {
return fmt.Errorf("unable to find workflow type, task %s", task)
}
timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false
scheduleDecision := false
// initialized when an activity timer with delay >= resurrectionCheckMinDelay
// is encountered, so that we don't need to scan history multiple times
// where there're multiple timers with high delay
var resurrectedActivity map[int64]struct{}
scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
defer cancel()
// need to clear activity heartbeat timer task mask for new activity timer task creation
// NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
// one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
// for updating workflow execution. In that case, only one new heartbeat timeout task should be
// created.
isHeartBeatTask := task.TimeoutType == int(types.TimeoutTypeHeartbeat)
activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= task.VisibilityTimestamp.Unix() {
activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
if err := mutableState.UpdateActivity(activityInfo); err != nil {
return err
}
updateMutableState = true
}
Loop:
for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
activityInfo, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
if !ok || timerSequenceID.Attempt < activityInfo.Attempt {
// handle 2 cases:
// 1. !ok
// this case can happen since each activity can have 4 timers
// and one of those 4 timers may have fired in this loop
// 2. timerSequenceID.attempt < activityInfo.Attempt
// retry could update activity attempt, should not timeouts new attempt
// 3. it's a resurrected activity and has already been deleted in this loop
continue Loop
}
delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
if !expired {
// timer sequence IDs are sorted, once there is one timer
// sequence ID not expired, all after that wil not expired
break Loop
}
if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil {
if resurrectedActivity == nil {
// overwrite the context here as scan history may take a long time to complete
// ctx will also be used by other operations like updateWorkflow
ctx = scanWorkflowCtx
resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState)
if err != nil {
t.logger.Error("Activity resurrection check failed", tag.Error(err))
return err
}
}
if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok {
// found activity resurrection
domainName := mutableState.GetDomainEntry().GetInfo().Name
t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter)
t.logger.Warn("Encounter resurrected activity, skip",
tag.WorkflowDomainID(task.DomainID),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.TaskType(task.TaskType),
tag.TaskID(task.TaskID),
tag.WorkflowActivityID(activityInfo.ActivityID),
tag.WorkflowScheduleID(activityInfo.ScheduleID),
)
// remove resurrected activity from mutable state
if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil {
return err
}
updateMutableState = true
continue Loop
}
}
// check if it's possible that the timeout is due to activity task lost
if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart {
domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID)
if err == nil && activityInfo.ScheduleToStartTimeout >= int32(t.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds()) {
// note that we ignore the race condition for the dynamic config value change here as it's only for metric and logging purpose.
// theoratically the check only applies to activities with retry policy
// however for activities without retry policy, we also want to check the potential task lost and emit the metric
// so reuse the same config value as a threshold so that the metric only got emitted if the activity has been started after a long time.
t.metricsClient.Scope(metrics.TimerActiveTaskActivityTimeoutScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityLostCounter)
t.logger.Warn("Potentially activity task lost",
tag.WorkflowDomainName(domainName),
tag.WorkflowID(task.WorkflowID),
tag.WorkflowRunID(task.RunID),
tag.WorkflowScheduleID(activityInfo.ScheduleID),
)
}
}
if ok, err := mutableState.RetryActivity(
activityInfo,
execution.TimerTypeToReason(timerSequenceID.TimerType),
nil,
); err != nil {
return err
} else if ok {
updateMutableState = true
continue Loop
}
t.emitTimeoutMetricScopeWithDomainTag(
mutableState.GetExecutionInfo().DomainID,
metrics.TimerActiveTaskActivityTimeoutScope,
timerSequenceID.TimerType,
metrics.WorkflowTypeTag(wfType.GetName()),
)
t.logger.Info("Activity timed out",
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(task.GetDomainID()),
tag.WorkflowID(task.GetWorkflowID()),
tag.WorkflowRunID(task.GetRunID()),
tag.ScheduleAttempt(task.ScheduleAttempt),
tag.FailoverVersion(task.GetVersion()),
)
if _, err := mutableState.AddActivityTaskTimedOutEvent(
activityInfo.ScheduleID,
activityInfo.StartedID,
execution.TimerTypeToInternal(timerSequenceID.TimerType),
activityInfo.Details,
); err != nil {
return err
}
updateMutableState = true
scheduleDecision = true
}
if !updateMutableState {
return nil
}
return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}