in service/history/execution/state_builder.go [85:519]
func (b *stateBuilderImpl) ApplyEvents(
domainID string,
requestID string,
workflowExecution types.WorkflowExecution,
history []*types.HistoryEvent,
newRunHistory []*types.HistoryEvent,
) (MutableState, error) {
if len(history) == 0 {
return nil, errors.NewInternalFailureError(errMessageHistorySizeZero)
}
firstEvent := history[0]
lastEvent := history[len(history)-1]
var newRunMutableStateBuilder MutableState
// need to clear the stickiness since workflow turned to passive
b.mutableState.ClearStickyness()
for _, event := range history {
// NOTE: stateBuilder is also being used in the active side
if err := b.mutableState.UpdateCurrentVersion(event.Version, true); err != nil {
return nil, err
}
versionHistories := b.mutableState.GetVersionHistories()
if versionHistories == nil {
return nil, ErrMissingVersionHistories
}
versionHistory, err := versionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
}
if err := versionHistory.AddOrUpdateItem(persistence.NewVersionHistoryItem(
event.ID,
event.Version,
)); err != nil {
return nil, err
}
b.mutableState.GetExecutionInfo().LastEventTaskID = event.TaskID
switch event.GetEventType() {
case types.EventTypeWorkflowExecutionStarted:
attributes := event.WorkflowExecutionStartedEventAttributes
var parentDomainID *string
// If ParentWorkflowDomainID is present use it, otherwise fallback to ParentWorkflowDomain
// as ParentWorkflowDomainID will not be present on older histories.
if attributes.ParentWorkflowDomainID != nil {
parentDomainID = attributes.ParentWorkflowDomainID
} else if attributes.GetParentWorkflowDomain() != "" {
parentDomainEntry, err := b.domainCache.GetDomain(
attributes.GetParentWorkflowDomain(),
)
if err != nil {
return nil, err
}
parentDomainID = &parentDomainEntry.GetInfo().ID
}
if err := b.mutableState.ReplicateWorkflowExecutionStartedEvent(
parentDomainID,
workflowExecution,
requestID,
event,
true,
); err != nil {
return nil, err
}
if err := b.mutableState.SetHistoryTree(
workflowExecution.GetRunID(),
); err != nil {
return nil, err
}
case types.EventTypeDecisionTaskScheduled:
attributes := event.DecisionTaskScheduledEventAttributes
// use event.GetTimestamp() as DecisionOriginalScheduledTimestamp, because the heartbeat is not happening here.
_, err := b.mutableState.ReplicateDecisionTaskScheduledEvent(
event.Version,
event.ID,
attributes.TaskList.GetName(),
attributes.GetStartToCloseTimeoutSeconds(),
attributes.GetAttempt(),
event.GetTimestamp(),
event.GetTimestamp(),
false,
)
if err != nil {
return nil, err
}
case types.EventTypeDecisionTaskStarted:
attributes := event.DecisionTaskStartedEventAttributes
_, err := b.mutableState.ReplicateDecisionTaskStartedEvent(
nil,
event.Version,
attributes.GetScheduledEventID(),
event.ID,
attributes.GetRequestID(),
event.GetTimestamp(),
)
if err != nil {
return nil, err
}
case types.EventTypeDecisionTaskCompleted:
if err := b.mutableState.ReplicateDecisionTaskCompletedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeDecisionTaskTimedOut:
if err := b.mutableState.ReplicateDecisionTaskTimedOutEvent(
event.DecisionTaskTimedOutEventAttributes.GetTimeoutType(),
); err != nil {
return nil, err
}
// this is for transient decision
err := b.mutableState.ReplicateTransientDecisionTaskScheduled()
if err != nil {
return nil, err
}
case types.EventTypeDecisionTaskFailed:
if err := b.mutableState.ReplicateDecisionTaskFailedEvent(); err != nil {
return nil, err
}
// this is for transient decision
err := b.mutableState.ReplicateTransientDecisionTaskScheduled()
if err != nil {
return nil, err
}
case types.EventTypeActivityTaskScheduled:
if _, err := b.mutableState.ReplicateActivityTaskScheduledEvent(
firstEvent.ID,
event,
false,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskStarted:
if err := b.mutableState.ReplicateActivityTaskStartedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskCompleted:
if err := b.mutableState.ReplicateActivityTaskCompletedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskFailed:
if err := b.mutableState.ReplicateActivityTaskFailedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskTimedOut:
if err := b.mutableState.ReplicateActivityTaskTimedOutEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskCancelRequested:
if err := b.mutableState.ReplicateActivityTaskCancelRequestedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeActivityTaskCanceled:
if err := b.mutableState.ReplicateActivityTaskCanceledEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeRequestCancelActivityTaskFailed:
// No mutable state action is needed
case types.EventTypeTimerStarted:
if _, err := b.mutableState.ReplicateTimerStartedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeTimerFired:
if err := b.mutableState.ReplicateTimerFiredEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeTimerCanceled:
if err := b.mutableState.ReplicateTimerCanceledEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeCancelTimerFailed:
// no mutable state action is needed
case types.EventTypeStartChildWorkflowExecutionInitiated:
if _, err := b.mutableState.ReplicateStartChildWorkflowExecutionInitiatedEvent(
firstEvent.ID,
event,
// create a new request ID which is used by transfer queue processor
// if domain is failed over at this point
uuid.New(),
); err != nil {
return nil, err
}
case types.EventTypeStartChildWorkflowExecutionFailed:
if err := b.mutableState.ReplicateStartChildWorkflowExecutionFailedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionStarted:
if err := b.mutableState.ReplicateChildWorkflowExecutionStartedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionCompleted:
if err := b.mutableState.ReplicateChildWorkflowExecutionCompletedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionFailed:
if err := b.mutableState.ReplicateChildWorkflowExecutionFailedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionCanceled:
if err := b.mutableState.ReplicateChildWorkflowExecutionCanceledEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionTimedOut:
if err := b.mutableState.ReplicateChildWorkflowExecutionTimedOutEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeChildWorkflowExecutionTerminated:
if err := b.mutableState.ReplicateChildWorkflowExecutionTerminatedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeRequestCancelExternalWorkflowExecutionInitiated:
if _, err := b.mutableState.ReplicateRequestCancelExternalWorkflowExecutionInitiatedEvent(
firstEvent.ID,
event,
// create a new request ID which is used by transfer queue processor
// if domain is failed over at this point
uuid.New(),
); err != nil {
return nil, err
}
case types.EventTypeRequestCancelExternalWorkflowExecutionFailed:
if err := b.mutableState.ReplicateRequestCancelExternalWorkflowExecutionFailedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeExternalWorkflowExecutionCancelRequested:
if err := b.mutableState.ReplicateExternalWorkflowExecutionCancelRequested(
event,
); err != nil {
return nil, err
}
case types.EventTypeSignalExternalWorkflowExecutionInitiated:
// Create a new request ID which is used by transfer queue processor if domain is failed over at this point
signalRequestID := uuid.New()
if _, err := b.mutableState.ReplicateSignalExternalWorkflowExecutionInitiatedEvent(
firstEvent.ID,
event,
signalRequestID,
); err != nil {
return nil, err
}
case types.EventTypeSignalExternalWorkflowExecutionFailed:
if err := b.mutableState.ReplicateSignalExternalWorkflowExecutionFailedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeExternalWorkflowExecutionSignaled:
if err := b.mutableState.ReplicateExternalWorkflowExecutionSignaled(
event,
); err != nil {
return nil, err
}
case types.EventTypeMarkerRecorded:
// No mutable state action is needed
case types.EventTypeWorkflowExecutionSignaled:
if err := b.mutableState.ReplicateWorkflowExecutionSignaled(
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionCancelRequested:
if err := b.mutableState.ReplicateWorkflowExecutionCancelRequestedEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeUpsertWorkflowSearchAttributes:
if err := b.mutableState.ReplicateUpsertWorkflowSearchAttributesEvent(
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionCompleted:
if err := b.mutableState.ReplicateWorkflowExecutionCompletedEvent(
firstEvent.ID,
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionFailed:
if err := b.mutableState.ReplicateWorkflowExecutionFailedEvent(
firstEvent.ID,
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionTimedOut:
if err := b.mutableState.ReplicateWorkflowExecutionTimedoutEvent(
firstEvent.ID,
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionCanceled:
if err := b.mutableState.ReplicateWorkflowExecutionCanceledEvent(
firstEvent.ID,
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionTerminated:
if err := b.mutableState.ReplicateWorkflowExecutionTerminatedEvent(
firstEvent.ID,
event,
); err != nil {
return nil, err
}
case types.EventTypeWorkflowExecutionContinuedAsNew:
// The length of newRunHistory can be zero in resend case
if len(newRunHistory) != 0 {
newRunMutableStateBuilder = NewMutableStateBuilderWithVersionHistories(
b.shard,
b.logger,
b.mutableState.GetDomainEntry(),
)
newRunStateBuilder := NewStateBuilder(b.shard, b.logger, newRunMutableStateBuilder)
newRunID := event.WorkflowExecutionContinuedAsNewEventAttributes.GetNewExecutionRunID()
newExecution := types.WorkflowExecution{
WorkflowID: workflowExecution.WorkflowID,
RunID: newRunID,
}
_, err := newRunStateBuilder.ApplyEvents(
domainID,
uuid.New(),
newExecution,
newRunHistory,
nil,
)
if err != nil {
return nil, err
}
}
err := b.mutableState.ReplicateWorkflowExecutionContinuedAsNewEvent(
firstEvent.ID,
domainID,
event,
)
if err != nil {
return nil, err
}
default:
return nil, &types.BadRequestError{Message: "Unknown event type"}
}
}
b.mutableState.GetExecutionInfo().SetLastFirstEventID(firstEvent.ID)
b.mutableState.GetExecutionInfo().SetNextEventID(lastEvent.ID + 1)
b.mutableState.SetHistoryBuilder(NewHistoryBuilderFromEvents(history))
return newRunMutableStateBuilder, nil
}