func()

in service/history/execution/state_builder.go [85:519]


func (b *stateBuilderImpl) ApplyEvents(
	domainID string,
	requestID string,
	workflowExecution types.WorkflowExecution,
	history []*types.HistoryEvent,
	newRunHistory []*types.HistoryEvent,
) (MutableState, error) {

	if len(history) == 0 {
		return nil, errors.NewInternalFailureError(errMessageHistorySizeZero)
	}
	firstEvent := history[0]
	lastEvent := history[len(history)-1]
	var newRunMutableStateBuilder MutableState

	// need to clear the stickiness since workflow turned to passive
	b.mutableState.ClearStickyness()

	for _, event := range history {
		// NOTE: stateBuilder is also being used in the active side
		if err := b.mutableState.UpdateCurrentVersion(event.Version, true); err != nil {
			return nil, err
		}
		versionHistories := b.mutableState.GetVersionHistories()
		if versionHistories == nil {
			return nil, ErrMissingVersionHistories
		}
		versionHistory, err := versionHistories.GetCurrentVersionHistory()
		if err != nil {
			return nil, err
		}
		if err := versionHistory.AddOrUpdateItem(persistence.NewVersionHistoryItem(
			event.ID,
			event.Version,
		)); err != nil {
			return nil, err
		}
		b.mutableState.GetExecutionInfo().LastEventTaskID = event.TaskID

		switch event.GetEventType() {
		case types.EventTypeWorkflowExecutionStarted:
			attributes := event.WorkflowExecutionStartedEventAttributes
			var parentDomainID *string
			// If ParentWorkflowDomainID is present use it, otherwise fallback to ParentWorkflowDomain
			// as ParentWorkflowDomainID will not be present on older histories.
			if attributes.ParentWorkflowDomainID != nil {
				parentDomainID = attributes.ParentWorkflowDomainID
			} else if attributes.GetParentWorkflowDomain() != "" {
				parentDomainEntry, err := b.domainCache.GetDomain(
					attributes.GetParentWorkflowDomain(),
				)
				if err != nil {
					return nil, err
				}
				parentDomainID = &parentDomainEntry.GetInfo().ID
			}

			if err := b.mutableState.ReplicateWorkflowExecutionStartedEvent(
				parentDomainID,
				workflowExecution,
				requestID,
				event,
				true,
			); err != nil {
				return nil, err
			}

			if err := b.mutableState.SetHistoryTree(
				workflowExecution.GetRunID(),
			); err != nil {
				return nil, err
			}

		case types.EventTypeDecisionTaskScheduled:
			attributes := event.DecisionTaskScheduledEventAttributes
			// use event.GetTimestamp() as DecisionOriginalScheduledTimestamp, because the heartbeat is not happening here.
			_, err := b.mutableState.ReplicateDecisionTaskScheduledEvent(
				event.Version,
				event.ID,
				attributes.TaskList.GetName(),
				attributes.GetStartToCloseTimeoutSeconds(),
				attributes.GetAttempt(),
				event.GetTimestamp(),
				event.GetTimestamp(),
				false,
			)
			if err != nil {
				return nil, err
			}

		case types.EventTypeDecisionTaskStarted:
			attributes := event.DecisionTaskStartedEventAttributes
			_, err := b.mutableState.ReplicateDecisionTaskStartedEvent(
				nil,
				event.Version,
				attributes.GetScheduledEventID(),
				event.ID,
				attributes.GetRequestID(),
				event.GetTimestamp(),
			)
			if err != nil {
				return nil, err
			}

		case types.EventTypeDecisionTaskCompleted:
			if err := b.mutableState.ReplicateDecisionTaskCompletedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeDecisionTaskTimedOut:
			if err := b.mutableState.ReplicateDecisionTaskTimedOutEvent(
				event.DecisionTaskTimedOutEventAttributes.GetTimeoutType(),
			); err != nil {
				return nil, err
			}

			// this is for transient decision
			err := b.mutableState.ReplicateTransientDecisionTaskScheduled()
			if err != nil {
				return nil, err
			}

		case types.EventTypeDecisionTaskFailed:
			if err := b.mutableState.ReplicateDecisionTaskFailedEvent(); err != nil {
				return nil, err
			}

			// this is for transient decision
			err := b.mutableState.ReplicateTransientDecisionTaskScheduled()
			if err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskScheduled:
			if _, err := b.mutableState.ReplicateActivityTaskScheduledEvent(
				firstEvent.ID,
				event,
				false,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskStarted:
			if err := b.mutableState.ReplicateActivityTaskStartedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskCompleted:
			if err := b.mutableState.ReplicateActivityTaskCompletedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskFailed:
			if err := b.mutableState.ReplicateActivityTaskFailedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskTimedOut:
			if err := b.mutableState.ReplicateActivityTaskTimedOutEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskCancelRequested:
			if err := b.mutableState.ReplicateActivityTaskCancelRequestedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeActivityTaskCanceled:
			if err := b.mutableState.ReplicateActivityTaskCanceledEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeRequestCancelActivityTaskFailed:
			// No mutable state action is needed

		case types.EventTypeTimerStarted:
			if _, err := b.mutableState.ReplicateTimerStartedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeTimerFired:
			if err := b.mutableState.ReplicateTimerFiredEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeTimerCanceled:
			if err := b.mutableState.ReplicateTimerCanceledEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeCancelTimerFailed:
			// no mutable state action is needed

		case types.EventTypeStartChildWorkflowExecutionInitiated:
			if _, err := b.mutableState.ReplicateStartChildWorkflowExecutionInitiatedEvent(
				firstEvent.ID,
				event,
				// create a new request ID which is used by transfer queue processor
				// if domain is failed over at this point
				uuid.New(),
			); err != nil {
				return nil, err
			}

		case types.EventTypeStartChildWorkflowExecutionFailed:
			if err := b.mutableState.ReplicateStartChildWorkflowExecutionFailedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionStarted:
			if err := b.mutableState.ReplicateChildWorkflowExecutionStartedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionCompleted:
			if err := b.mutableState.ReplicateChildWorkflowExecutionCompletedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionFailed:
			if err := b.mutableState.ReplicateChildWorkflowExecutionFailedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionCanceled:
			if err := b.mutableState.ReplicateChildWorkflowExecutionCanceledEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionTimedOut:
			if err := b.mutableState.ReplicateChildWorkflowExecutionTimedOutEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeChildWorkflowExecutionTerminated:
			if err := b.mutableState.ReplicateChildWorkflowExecutionTerminatedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeRequestCancelExternalWorkflowExecutionInitiated:
			if _, err := b.mutableState.ReplicateRequestCancelExternalWorkflowExecutionInitiatedEvent(
				firstEvent.ID,
				event,
				// create a new request ID which is used by transfer queue processor
				// if domain is failed over at this point
				uuid.New(),
			); err != nil {
				return nil, err
			}

		case types.EventTypeRequestCancelExternalWorkflowExecutionFailed:
			if err := b.mutableState.ReplicateRequestCancelExternalWorkflowExecutionFailedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeExternalWorkflowExecutionCancelRequested:
			if err := b.mutableState.ReplicateExternalWorkflowExecutionCancelRequested(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeSignalExternalWorkflowExecutionInitiated:
			// Create a new request ID which is used by transfer queue processor if domain is failed over at this point
			signalRequestID := uuid.New()
			if _, err := b.mutableState.ReplicateSignalExternalWorkflowExecutionInitiatedEvent(
				firstEvent.ID,
				event,
				signalRequestID,
			); err != nil {
				return nil, err
			}

		case types.EventTypeSignalExternalWorkflowExecutionFailed:
			if err := b.mutableState.ReplicateSignalExternalWorkflowExecutionFailedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeExternalWorkflowExecutionSignaled:
			if err := b.mutableState.ReplicateExternalWorkflowExecutionSignaled(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeMarkerRecorded:
			// No mutable state action is needed

		case types.EventTypeWorkflowExecutionSignaled:
			if err := b.mutableState.ReplicateWorkflowExecutionSignaled(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionCancelRequested:
			if err := b.mutableState.ReplicateWorkflowExecutionCancelRequestedEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeUpsertWorkflowSearchAttributes:
			if err := b.mutableState.ReplicateUpsertWorkflowSearchAttributesEvent(
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionCompleted:
			if err := b.mutableState.ReplicateWorkflowExecutionCompletedEvent(
				firstEvent.ID,
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionFailed:
			if err := b.mutableState.ReplicateWorkflowExecutionFailedEvent(
				firstEvent.ID,
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionTimedOut:
			if err := b.mutableState.ReplicateWorkflowExecutionTimedoutEvent(
				firstEvent.ID,
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionCanceled:
			if err := b.mutableState.ReplicateWorkflowExecutionCanceledEvent(
				firstEvent.ID,
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionTerminated:
			if err := b.mutableState.ReplicateWorkflowExecutionTerminatedEvent(
				firstEvent.ID,
				event,
			); err != nil {
				return nil, err
			}

		case types.EventTypeWorkflowExecutionContinuedAsNew:

			// The length of newRunHistory can be zero in resend case
			if len(newRunHistory) != 0 {
				newRunMutableStateBuilder = NewMutableStateBuilderWithVersionHistories(
					b.shard,
					b.logger,
					b.mutableState.GetDomainEntry(),
				)
				newRunStateBuilder := NewStateBuilder(b.shard, b.logger, newRunMutableStateBuilder)
				newRunID := event.WorkflowExecutionContinuedAsNewEventAttributes.GetNewExecutionRunID()
				newExecution := types.WorkflowExecution{
					WorkflowID: workflowExecution.WorkflowID,
					RunID:      newRunID,
				}
				_, err := newRunStateBuilder.ApplyEvents(
					domainID,
					uuid.New(),
					newExecution,
					newRunHistory,
					nil,
				)
				if err != nil {
					return nil, err
				}
			}

			err := b.mutableState.ReplicateWorkflowExecutionContinuedAsNewEvent(
				firstEvent.ID,
				domainID,
				event,
			)
			if err != nil {
				return nil, err
			}

		default:
			return nil, &types.BadRequestError{Message: "Unknown event type"}
		}
	}

	b.mutableState.GetExecutionInfo().SetLastFirstEventID(firstEvent.ID)
	b.mutableState.GetExecutionInfo().SetNextEventID(lastEvent.ID + 1)

	b.mutableState.SetHistoryBuilder(NewHistoryBuilderFromEvents(history))

	return newRunMutableStateBuilder, nil
}