func()

in service/history/task/timer_active_task_executor.go [258:439]


func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(
	ctx context.Context,
	task *persistence.TimerTaskInfo,
) (retError error) {

	wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
		task.DomainID,
		getWorkflowExecution(task),
		taskGetExecutionContextTimeout,
	)
	if err != nil {
		if err == context.DeadlineExceeded {
			return errWorkflowBusy
		}
		return err
	}
	defer func() { release(retError) }()

	domainName, err := t.shard.GetDomainCache().GetDomainName(task.DomainID)
	if err != nil {
		return fmt.Errorf("unable to find domainID: %v, err: %v", task.DomainID, err)
	}

	mutableState, err := loadMutableStateForTimerTask(ctx, wfContext, task, t.metricsClient, t.logger)
	if err != nil {
		return err
	}
	if mutableState == nil || !mutableState.IsWorkflowExecutionRunning() {
		return nil
	}

	wfType := mutableState.GetWorkflowType()
	if wfType == nil {
		return fmt.Errorf("unable to find workflow type, task %s", task)
	}

	timerSequence := execution.NewTimerSequence(mutableState)
	referenceTime := t.shard.GetTimeSource().Now()
	resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
	updateMutableState := false
	scheduleDecision := false

	// initialized when an activity timer with delay >= resurrectionCheckMinDelay
	// is encountered, so that we don't need to scan history multiple times
	// where there're multiple timers with high delay
	var resurrectedActivity map[int64]struct{}
	scanWorkflowCtx, cancel := context.WithTimeout(context.Background(), scanWorkflowTimeout)
	defer cancel()

	// need to clear activity heartbeat timer task mask for new activity timer task creation
	// NOTE: LastHeartbeatTimeoutVisibilityInSeconds is for deduping heartbeat timer creation as it's possible
	// one heartbeat task was persisted multiple times with different taskIDs due to the retry logic
	// for updating workflow execution. In that case, only one new heartbeat timeout task should be
	// created.
	isHeartBeatTask := task.TimeoutType == int(types.TimeoutTypeHeartbeat)
	activityInfo, ok := mutableState.GetActivityInfo(task.EventID)
	if isHeartBeatTask && ok && activityInfo.LastHeartbeatTimeoutVisibilityInSeconds <= task.VisibilityTimestamp.Unix() {
		activityInfo.TimerTaskStatus = activityInfo.TimerTaskStatus &^ execution.TimerTaskStatusCreatedHeartbeat
		if err := mutableState.UpdateActivity(activityInfo); err != nil {
			return err
		}
		updateMutableState = true
	}

Loop:
	for _, timerSequenceID := range timerSequence.LoadAndSortActivityTimers() {
		activityInfo, ok := mutableState.GetActivityInfo(timerSequenceID.EventID)
		if !ok || timerSequenceID.Attempt < activityInfo.Attempt {
			// handle 2 cases:
			// 1. !ok
			//  this case can happen since each activity can have 4 timers
			//  and one of those 4 timers may have fired in this loop
			// 2. timerSequenceID.attempt < activityInfo.Attempt
			//  retry could update activity attempt, should not timeouts new attempt
			// 3. it's a resurrected activity and has already been deleted in this loop
			continue Loop
		}

		delay, expired := timerSequence.IsExpired(referenceTime, timerSequenceID)
		if !expired {
			// timer sequence IDs are sorted, once there is one timer
			// sequence ID not expired, all after that wil not expired
			break Loop
		}

		if delay >= resurrectionCheckMinDelay || resurrectedActivity != nil {
			if resurrectedActivity == nil {
				// overwrite the context here as scan history may take a long time to complete
				// ctx will also be used by other operations like updateWorkflow
				ctx = scanWorkflowCtx
				resurrectedActivity, err = execution.GetResurrectedActivities(ctx, t.shard, mutableState)
				if err != nil {
					t.logger.Error("Activity resurrection check failed", tag.Error(err))
					return err
				}
			}

			if _, ok := resurrectedActivity[activityInfo.ScheduleID]; ok {
				// found activity resurrection
				domainName := mutableState.GetDomainEntry().GetInfo().Name
				t.metricsClient.Scope(metrics.TimerQueueProcessorScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityResurrectionCounter)
				t.logger.Warn("Encounter resurrected activity, skip",
					tag.WorkflowDomainID(task.DomainID),
					tag.WorkflowID(task.WorkflowID),
					tag.WorkflowRunID(task.RunID),
					tag.TaskType(task.TaskType),
					tag.TaskID(task.TaskID),
					tag.WorkflowActivityID(activityInfo.ActivityID),
					tag.WorkflowScheduleID(activityInfo.ScheduleID),
				)

				// remove resurrected activity from mutable state
				if err := mutableState.DeleteActivity(activityInfo.ScheduleID); err != nil {
					return err
				}
				updateMutableState = true
				continue Loop
			}
		}

		// check if it's possible that the timeout is due to activity task lost
		if timerSequenceID.TimerType == execution.TimerTypeScheduleToStart {
			domainName, err := t.shard.GetDomainCache().GetDomainName(mutableState.GetExecutionInfo().DomainID)
			if err == nil && activityInfo.ScheduleToStartTimeout >= int32(t.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds()) {
				// note that we ignore the race condition for the dynamic config value change here as it's only for metric and logging purpose.
				// theoratically the check only applies to activities with retry policy
				// however for activities without retry policy, we also want to check the potential task lost and emit the metric
				// so reuse the same config value as a threshold so that the metric only got emitted if the activity has been started after a long time.
				t.metricsClient.Scope(metrics.TimerActiveTaskActivityTimeoutScope, metrics.DomainTag(domainName)).IncCounter(metrics.ActivityLostCounter)
				t.logger.Warn("Potentially activity task lost",
					tag.WorkflowDomainName(domainName),
					tag.WorkflowID(task.WorkflowID),
					tag.WorkflowRunID(task.RunID),
					tag.WorkflowScheduleID(activityInfo.ScheduleID),
				)
			}
		}

		if ok, err := mutableState.RetryActivity(
			activityInfo,
			execution.TimerTypeToReason(timerSequenceID.TimerType),
			nil,
		); err != nil {
			return err
		} else if ok {
			updateMutableState = true
			continue Loop
		}

		t.emitTimeoutMetricScopeWithDomainTag(
			mutableState.GetExecutionInfo().DomainID,
			metrics.TimerActiveTaskActivityTimeoutScope,
			timerSequenceID.TimerType,
			metrics.WorkflowTypeTag(wfType.GetName()),
		)

		t.logger.Info("Activity timed out",
			tag.WorkflowDomainName(domainName),
			tag.WorkflowDomainID(task.GetDomainID()),
			tag.WorkflowID(task.GetWorkflowID()),
			tag.WorkflowRunID(task.GetRunID()),
			tag.ScheduleAttempt(task.ScheduleAttempt),
			tag.FailoverVersion(task.GetVersion()),
		)

		if _, err := mutableState.AddActivityTaskTimedOutEvent(
			activityInfo.ScheduleID,
			activityInfo.StartedID,
			execution.TimerTypeToInternal(timerSequenceID.TimerType),
			activityInfo.Details,
		); err != nil {
			return err
		}
		updateMutableState = true
		scheduleDecision = true
	}

	if !updateMutableState {
		return nil
	}
	return t.updateWorkflowExecution(ctx, wfContext, mutableState, scheduleDecision)
}