in service/history/execution/mutable_state_builder.go [702:779]
func (e *mutableStateBuilder) assignEventIDToBufferedEvents() {
newCommittedEvents := e.hBuilder.history
scheduledIDToStartedID := make(map[int64]int64)
for _, event := range newCommittedEvents {
if event.ID != common.BufferedEventID {
continue
}
eventID := e.executionInfo.NextEventID
event.ID = eventID
e.executionInfo.IncreaseNextEventID()
switch event.GetEventType() {
case types.EventTypeActivityTaskStarted:
attributes := event.ActivityTaskStartedEventAttributes
scheduledID := attributes.GetScheduledEventID()
scheduledIDToStartedID[scheduledID] = eventID
if ai, ok := e.GetActivityInfo(scheduledID); ok {
ai.StartedID = eventID
e.updateActivityInfos[ai.ScheduleID] = ai
}
case types.EventTypeChildWorkflowExecutionStarted:
attributes := event.ChildWorkflowExecutionStartedEventAttributes
initiatedID := attributes.GetInitiatedEventID()
scheduledIDToStartedID[initiatedID] = eventID
if ci, ok := e.GetChildExecutionInfo(initiatedID); ok {
ci.StartedID = eventID
e.updateChildExecutionInfos[ci.InitiatedID] = ci
}
case types.EventTypeActivityTaskCompleted:
attributes := event.ActivityTaskCompletedEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeActivityTaskFailed:
attributes := event.ActivityTaskFailedEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeActivityTaskTimedOut:
attributes := event.ActivityTaskTimedOutEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeActivityTaskCanceled:
attributes := event.ActivityTaskCanceledEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeChildWorkflowExecutionCompleted:
attributes := event.ChildWorkflowExecutionCompletedEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeChildWorkflowExecutionFailed:
attributes := event.ChildWorkflowExecutionFailedEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeChildWorkflowExecutionTimedOut:
attributes := event.ChildWorkflowExecutionTimedOutEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeChildWorkflowExecutionCanceled:
attributes := event.ChildWorkflowExecutionCanceledEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok {
attributes.StartedEventID = startedID
}
case types.EventTypeChildWorkflowExecutionTerminated:
attributes := event.ChildWorkflowExecutionTerminatedEventAttributes
if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok {
attributes.StartedEventID = startedID
}
}
}
}