func()

in service/history/task/timer_active_task_executor.go [113:256]


func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(
	ctx context.Context,
	task *persistence.TimerTaskInfo,
) (retError error) {
	t.logger.Debug("Processing user timer",
		tag.WorkflowDomainID(task.DomainID),
		tag.WorkflowID(task.WorkflowID),
		tag.WorkflowRunID(task.RunID),
		tag.TaskID(task.TaskID),
	)
	wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
		task.DomainID,
		getWorkflowExecution(task),
		taskGetExecutionContextTimeout,
	)
	if err != nil {
		if err == context.DeadlineExceeded {
			return errWorkflowBusy
		}
		return err
	}
	defer func() { release(retError) }()

	mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
	if err != nil {
		return err
	}
	if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
		return nil
	}

	timerSequence := execution.NewTimerSequence(mutableState)
	referenceTime := t.shard.GetTimeSource().Now()
	resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
	updateMutableState := false
	debugLog := t.logger.Debug
	if t.config.EnableDebugMode && t.config.EnableTimerDebugLogByDomainID(task.DomainID) {
		debugLog = t.logger.Info
	}

	// initialized when a timer with delay >= resurrectionCheckMinDelay
	// is encountered, so that we don't need to scan history multiple times
	// where there're multiple timers with high delay
	var resurrectedTimer map[string]struct{}
	scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
	defer cancel()

	sortedUserTimers := timerSequence.LoadAndSortUserTimers()
	debugLog("Sorted user timers",
		tag.WorkflowDomainID(task.DomainID),
		tag.WorkflowID(task.WorkflowID),
		tag.WorkflowRunID(task.RunID),
		tag.Counter(len(sortedUserTimers)),
	)

Loop:
	for _, timerSequenceID := range sortedUserTimers {
		timerInfo, ok := mutableState.GetUserTimerInfoByEventID(timerSequenceID.EventID)
		if !ok {
			errString := fmt.Sprintf("failed to find in user timer event ID: %v", timerSequenceID.EventID)
			t.logger.Error(errString)
			return &types.InternalServiceError{Message: errString}
		}

		delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
		debugLog("Processing user timer sequence id",
			tag.WorkflowDomainID(task.DomainID),
			tag.WorkflowID(task.WorkflowID),
			tag.WorkflowRunID(task.RunID),
			tag.TaskType(task.TaskType),
			tag.TaskID(task.TaskID),
			tag.WorkflowTimerID(timerInfo.TimerID),
			tag.WorkflowScheduleID(timerInfo.StartedID),
			tag.Dynamic("timer-sequence-id", timerSequenceID),
			tag.Dynamic("timer-info", timerInfo),
			tag.Dynamic("delay", delay),
			tag.Dynamic("expired", expired),
		)

		if !expired {
			// timer sequence IDs are sorted, once there is one timer
			// sequence ID not expired, all after that wil not expired
			break Loop
		}

		if delay >= resurrectionCheckMinDelay || resurrectedTimer != nil {
			if resurrectedTimer == nil {
				// overwrite the context here as scan history may take a long time to complete
				// ctx will also be used by other operations like updateWorkflow
				ctx = scanWorkflowCtx
				resurrectedTimer, err = execution.GetResurrectedTimers(ctx, t.shard, mutableState)
				if err != nil {
					t.logger.Error("Timer resurrection check failed", tag.Error(err))
					return err
				}
			}

			if _, ok := resurrectedTimer[timerInfo.TimerID]; ok {
				// found timer resurrection
				domainName := mutableState.GetDomainEntry().GetInfo().Name
				t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.TimerResurrectionCounter)
				t.logger.Warn("Encounter resurrected timer, skip",
					tag.WorkflowDomainID(task.DomainID),
					tag.WorkflowID(task.WorkflowID),
					tag.WorkflowRunID(task.RunID),
					tag.TaskType(task.TaskType),
					tag.TaskID(task.TaskID),
					tag.WorkflowTimerID(timerInfo.TimerID),
					tag.WorkflowScheduleID(timerInfo.StartedID), // timerStartedEvent is basically scheduled event
				)

				// remove resurrected timer from mutable state
				if err := mutableState.DeleteUserTimer(timerInfo.TimerID); err != nil {
					return err
				}
				updateMutableState = true
				continue Loop
			}
		}

		if _, err := mutableState.AddTimerFiredEvent(timerInfo.TimerID); err != nil {
			return err
		}
		updateMutableState = true

		debugLog("User timer fired",
			tag.WorkflowDomainID(task.DomainID),
			tag.WorkflowID(task.WorkflowID),
			tag.WorkflowRunID(task.RunID),
			tag.TaskType(task.TaskType),
			tag.TaskID(task.TaskID),
			tag.WorkflowTimerID(timerInfo.TimerID),
			tag.WorkflowScheduleID(timerInfo.StartedID),
			tag.WorkflowNextEventID(mutableState.GetNextEventID()),
		)

	}

	if !updateMutableState {
		return nil
	}

	return t.updateWorkflowExecution(ctx, wfContext, mutableState, updateMutableState)
}