service/history/execution/mutable_state_builder.go (3,873 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package execution import ( "context" "encoding/json" "fmt" "math/rand" "time" "github.com/pborman/uuid" "golang.org/x/exp/maps" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/checksum" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/query" "github.com/uber/cadence/service/history/shard" ) const ( mutableStateInvalidHistoryActionMsg = "invalid history builder state for action" mutableStateInvalidHistoryActionMsgTemplate = mutableStateInvalidHistoryActionMsg + ": %v" timerCancellationMsgTimerIDUnknown = "TIMER_ID_UNKNOWN" ) var ( // ErrWorkflowFinished indicates trying to mutate mutable state after workflow finished ErrWorkflowFinished = &types.InternalServiceError{Message: "invalid mutable state action: mutation after finish"} // ErrMissingTimerInfo indicates missing timer info ErrMissingTimerInfo = &types.InternalServiceError{Message: "unable to get timer info"} // ErrMissingActivityInfo indicates missing activity info ErrMissingActivityInfo = &types.InternalServiceError{Message: "unable to get activity info"} // ErrMissingChildWorkflowInfo indicates missing child workflow info ErrMissingChildWorkflowInfo = &types.InternalServiceError{Message: "unable to get child workflow info"} // ErrMissingWorkflowStartEvent indicates missing workflow start event ErrMissingWorkflowStartEvent = &types.InternalServiceError{Message: "unable to get workflow start event"} // ErrMissingWorkflowCompletionEvent indicates missing workflow completion event ErrMissingWorkflowCompletionEvent = &types.InternalServiceError{Message: "unable to get workflow completion event"} // ErrMissingActivityScheduledEvent indicates missing workflow activity scheduled event ErrMissingActivityScheduledEvent = &types.InternalServiceError{Message: "unable to get activity scheduled event"} // ErrMissingChildWorkflowInitiatedEvent indicates missing child workflow initiated event ErrMissingChildWorkflowInitiatedEvent = &types.InternalServiceError{Message: "unable to get child workflow initiated event"} // ErrEventsAfterWorkflowFinish is the error indicating server error trying to write events after workflow finish event ErrEventsAfterWorkflowFinish = &types.InternalServiceError{Message: "error validating last event being workflow finish event"} // ErrMissingVersionHistories is the error indicating cadence failed to process 2dc workflow type. ErrMissingVersionHistories = &types.BadRequestError{Message: "versionHistories is empty, which is required for NDC feature. It's probably from deprecated 2dc workflows"} // ErrTooManyPendingActivities is the error that currently there are too many pending activities in the workflow ErrTooManyPendingActivities = &types.InternalServiceError{Message: "Too many pending activities"} ) type ( mutableStateBuilder struct { pendingActivityInfoIDs map[int64]*persistence.ActivityInfo // Schedule Event ID -> Activity Info. pendingActivityIDToEventID map[string]int64 // Activity ID -> Schedule Event ID of the activity. updateActivityInfos map[int64]*persistence.ActivityInfo // Modified activities from last update. deleteActivityInfos map[int64]struct{} // Deleted activities from last update. syncActivityTasks map[int64]struct{} // Activity to be sync to remote pendingTimerInfoIDs map[string]*persistence.TimerInfo // User Timer ID -> Timer Info. pendingTimerEventIDToID map[int64]string // User Timer Start Event ID -> User Timer ID. updateTimerInfos map[string]*persistence.TimerInfo // Modified timers from last update. deleteTimerInfos map[string]struct{} // Deleted timers from last update. pendingChildExecutionInfoIDs map[int64]*persistence.ChildExecutionInfo // Initiated Event ID -> Child Execution Info updateChildExecutionInfos map[int64]*persistence.ChildExecutionInfo // Modified ChildExecution Infos since last update deleteChildExecutionInfos map[int64]struct{} // Deleted ChildExecution Infos since last update pendingRequestCancelInfoIDs map[int64]*persistence.RequestCancelInfo // Initiated Event ID -> RequestCancelInfo updateRequestCancelInfos map[int64]*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update, for persistence update deleteRequestCancelInfos map[int64]struct{} // Deleted RequestCancel Infos since last update, for persistence update pendingSignalInfoIDs map[int64]*persistence.SignalInfo // Initiated Event ID -> SignalInfo updateSignalInfos map[int64]*persistence.SignalInfo // Modified SignalInfo since last update deleteSignalInfos map[int64]struct{} // Deleted SignalInfos since last update pendingSignalRequestedIDs map[string]struct{} // Set of signaled requestIds updateSignalRequestedIDs map[string]struct{} // Set of signaled requestIds since last update deleteSignalRequestedIDs map[string]struct{} // Deleted signaled requestIds bufferedEvents []*types.HistoryEvent // buffered history events that are already persisted updateBufferedEvents []*types.HistoryEvent // buffered history events that needs to be persisted clearBufferedEvents bool // delete buffered events from persistence // This section includes Workflow Execution Info parameters like StartTimestamp, // which are only visible after the workflow has begun. // However, there are other parameters such as LastEventTimestamp, // which are updated as the execution progresses through various cycles. // It's common to encounter null values for these timestamps initially, // as they are populated once the update cycles are initiated. executionInfo *persistence.WorkflowExecutionInfo // Workflow mutable state info. versionHistories *persistence.VersionHistories // TODO: remove this struct after all 2DC workflows complete replicationState *persistence.ReplicationState hBuilder *HistoryBuilder // in memory only attributes // indicate the current version currentVersion int64 // indicates whether there are buffered events in persistence hasBufferedEventsInDB bool // indicates the workflow state in DB, can be used to calculate // whether this workflow is pointed by current workflow record stateInDB int // indicates the next event ID in DB, for conditional update nextEventIDInDB int64 // domain entry contains a snapshot of domain // NOTE: do not use the failover version inside, use currentVersion above domainEntry *cache.DomainCacheEntry // record if a event has been applied to mutable state // TODO: persist this to db appliedEvents map[string]struct{} insertTransferTasks []persistence.Task insertCrossClusterTasks []persistence.Task insertReplicationTasks []persistence.Task insertTimerTasks []persistence.Task // do not rely on this, this is only updated on // Load() and closeTransactionXXX methods. So when // a transaction is in progress, this value will be // wrong. This exist primarily for visibility via CLI checksum checksum.Checksum taskGenerator MutableStateTaskGenerator decisionTaskManager mutableStateDecisionTaskManager queryRegistry query.Registry shard shard.Context clusterMetadata cluster.Metadata eventsCache events.Cache config *config.Config timeSource clock.TimeSource logger log.Logger metricsClient metrics.Client pendingActivityWarningSent bool executionStats *persistence.ExecutionStats } ) var _ MutableState = (*mutableStateBuilder)(nil) // NewMutableStateBuilder creates a new workflow mutable state builder func NewMutableStateBuilder( shard shard.Context, logger log.Logger, domainEntry *cache.DomainCacheEntry, ) MutableState { return newMutableStateBuilder(shard, logger, domainEntry) } func newMutableStateBuilder( shard shard.Context, logger log.Logger, domainEntry *cache.DomainCacheEntry, ) *mutableStateBuilder { s := &mutableStateBuilder{ updateActivityInfos: make(map[int64]*persistence.ActivityInfo), pendingActivityInfoIDs: make(map[int64]*persistence.ActivityInfo), pendingActivityIDToEventID: make(map[string]int64), deleteActivityInfos: make(map[int64]struct{}), syncActivityTasks: make(map[int64]struct{}), pendingTimerInfoIDs: make(map[string]*persistence.TimerInfo), pendingTimerEventIDToID: make(map[int64]string), updateTimerInfos: make(map[string]*persistence.TimerInfo), deleteTimerInfos: make(map[string]struct{}), updateChildExecutionInfos: make(map[int64]*persistence.ChildExecutionInfo), pendingChildExecutionInfoIDs: make(map[int64]*persistence.ChildExecutionInfo), deleteChildExecutionInfos: make(map[int64]struct{}), updateRequestCancelInfos: make(map[int64]*persistence.RequestCancelInfo), pendingRequestCancelInfoIDs: make(map[int64]*persistence.RequestCancelInfo), deleteRequestCancelInfos: make(map[int64]struct{}), updateSignalInfos: make(map[int64]*persistence.SignalInfo), pendingSignalInfoIDs: make(map[int64]*persistence.SignalInfo), deleteSignalInfos: make(map[int64]struct{}), updateSignalRequestedIDs: make(map[string]struct{}), pendingSignalRequestedIDs: make(map[string]struct{}), deleteSignalRequestedIDs: make(map[string]struct{}), currentVersion: domainEntry.GetFailoverVersion(), hasBufferedEventsInDB: false, stateInDB: persistence.WorkflowStateVoid, nextEventIDInDB: 0, domainEntry: domainEntry, appliedEvents: make(map[string]struct{}), queryRegistry: query.NewRegistry(), shard: shard, clusterMetadata: shard.GetClusterMetadata(), eventsCache: shard.GetEventsCache(), config: shard.GetConfig(), timeSource: shard.GetTimeSource(), logger: logger, metricsClient: shard.GetMetricsClient(), } s.executionInfo = &persistence.WorkflowExecutionInfo{ DecisionVersion: common.EmptyVersion, DecisionScheduleID: common.EmptyEventID, DecisionStartedID: common.EmptyEventID, DecisionRequestID: common.EmptyUUID, DecisionTimeout: 0, NextEventID: common.FirstEventID, State: persistence.WorkflowStateCreated, CloseStatus: persistence.WorkflowCloseStatusNone, LastProcessedEvent: common.EmptyEventID, } s.hBuilder = NewHistoryBuilder(s) s.taskGenerator = NewMutableStateTaskGenerator(shard.GetClusterMetadata(), shard.GetDomainCache(), s) s.decisionTaskManager = newMutableStateDecisionTaskManager(s) s.executionStats = &persistence.ExecutionStats{} return s } // NewMutableStateBuilderWithVersionHistories creates mutable state builder with version history initialized func NewMutableStateBuilderWithVersionHistories( shard shard.Context, logger log.Logger, domainEntry *cache.DomainCacheEntry, ) MutableState { s := newMutableStateBuilder(shard, logger, domainEntry) s.versionHistories = persistence.NewVersionHistories(&persistence.VersionHistory{}) return s } // NewMutableStateBuilderWithEventV2 is used only in test func NewMutableStateBuilderWithEventV2( shard shard.Context, logger log.Logger, runID string, domainEntry *cache.DomainCacheEntry, ) MutableState { msBuilder := NewMutableStateBuilder(shard, logger, domainEntry) _ = msBuilder.SetHistoryTree(runID) return msBuilder } // NewMutableStateBuilderWithVersionHistoriesWithEventV2 is used only in test func NewMutableStateBuilderWithVersionHistoriesWithEventV2( shard shard.Context, logger log.Logger, version int64, runID string, domainEntry *cache.DomainCacheEntry, ) MutableState { msBuilder := NewMutableStateBuilderWithVersionHistories(shard, logger, domainEntry) err := msBuilder.UpdateCurrentVersion(version, false) if err != nil { logger.Error("update current version error", tag.Error(err)) } _ = msBuilder.SetHistoryTree(runID) return msBuilder } // todo (david.porter) func (e *mutableStateBuilder) CopyToPersistence() *persistence.WorkflowMutableState { state := &persistence.WorkflowMutableState{} state.ActivityInfos = e.pendingActivityInfoIDs state.TimerInfos = e.pendingTimerInfoIDs state.ChildExecutionInfos = e.pendingChildExecutionInfoIDs state.RequestCancelInfos = e.pendingRequestCancelInfoIDs state.SignalInfos = e.pendingSignalInfoIDs state.SignalRequestedIDs = e.pendingSignalRequestedIDs state.ExecutionInfo = e.executionInfo state.BufferedEvents = e.bufferedEvents state.VersionHistories = e.versionHistories state.Checksum = e.checksum return state } func (e *mutableStateBuilder) Load( state *persistence.WorkflowMutableState, ) error { e.pendingActivityInfoIDs = state.ActivityInfos for _, activityInfo := range state.ActivityInfos { e.pendingActivityIDToEventID[activityInfo.ActivityID] = activityInfo.ScheduleID } e.pendingTimerInfoIDs = state.TimerInfos for _, timerInfo := range state.TimerInfos { e.pendingTimerEventIDToID[timerInfo.StartedID] = timerInfo.TimerID } e.pendingChildExecutionInfoIDs = state.ChildExecutionInfos e.pendingRequestCancelInfoIDs = state.RequestCancelInfos e.pendingSignalInfoIDs = state.SignalInfos e.pendingSignalRequestedIDs = state.SignalRequestedIDs e.executionInfo = state.ExecutionInfo e.bufferedEvents = state.BufferedEvents e.currentVersion = common.EmptyVersion e.hasBufferedEventsInDB = len(e.bufferedEvents) > 0 e.stateInDB = state.ExecutionInfo.State e.nextEventIDInDB = state.ExecutionInfo.NextEventID e.versionHistories = state.VersionHistories // TODO: remove this after all 2DC workflows complete e.replicationState = state.ReplicationState e.checksum = state.Checksum e.executionStats = state.ExecutionStats e.fillForBackwardsCompatibility() if len(state.Checksum.Value) > 0 { switch { case e.shouldInvalidateChecksum(): e.checksum = checksum.Checksum{} e.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.MutableStateChecksumInvalidated) case e.shouldVerifyChecksum(): if err := verifyMutableStateChecksum(e, state.Checksum); err != nil { // we ignore checksum verification errors for now until this // feature is tested and/or we have mechanisms in place to deal // with these types of errors e.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.MutableStateChecksumMismatch) e.logError("mutable state checksum mismatch", tag.WorkflowNextEventID(e.executionInfo.NextEventID), tag.WorkflowScheduleID(e.executionInfo.DecisionScheduleID), tag.WorkflowStartedID(e.executionInfo.DecisionStartedID), tag.Dynamic("timerIDs", maps.Keys(e.pendingTimerInfoIDs)), tag.Dynamic("activityIDs", maps.Keys(e.pendingActivityInfoIDs)), tag.Dynamic("childIDs", maps.Keys(e.pendingChildExecutionInfoIDs)), tag.Dynamic("signalIDs", maps.Keys(e.pendingSignalInfoIDs)), tag.Dynamic("cancelIDs", maps.Keys(e.pendingRequestCancelInfoIDs)), tag.Error(err)) if e.enableChecksumFailureRetry() { return err } } } } return nil } func (e *mutableStateBuilder) fillForBackwardsCompatibility() { // With https://github.com/uber/cadence/pull/4601 newly introduced DomainID may not be set for older workflows. // Here we will fill its value based on previously used domain name. for _, info := range e.pendingChildExecutionInfoIDs { if info.DomainID == "" && info.DomainNameDEPRECATED != "" { domainID, err := e.shard.GetDomainCache().GetDomainID(info.DomainNameDEPRECATED) if err != nil { e.logError("failed to fill domainId for pending child executions", tag.Error(err)) } info.DomainID = domainID } } } func (e *mutableStateBuilder) GetCurrentBranchToken() ([]byte, error) { if e.versionHistories != nil { currentVersionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return nil, err } return currentVersionHistory.GetBranchToken(), nil } return e.executionInfo.BranchToken, nil } func (e *mutableStateBuilder) GetVersionHistories() *persistence.VersionHistories { return e.versionHistories } // set treeID/historyBranches func (e *mutableStateBuilder) SetHistoryTree( treeID string, ) error { initialBranchToken, err := persistence.NewHistoryBranchToken(treeID) if err != nil { return err } return e.SetCurrentBranchToken(initialBranchToken) } func (e *mutableStateBuilder) SetCurrentBranchToken( branchToken []byte, ) error { exeInfo := e.GetExecutionInfo() if e.versionHistories == nil { exeInfo.BranchToken = branchToken return nil } currentVersionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return err } return currentVersionHistory.SetBranchToken(branchToken) } func (e *mutableStateBuilder) SetVersionHistories( versionHistories *persistence.VersionHistories, ) error { e.versionHistories = versionHistories return nil } func (e *mutableStateBuilder) GetHistoryBuilder() *HistoryBuilder { return e.hBuilder } func (e *mutableStateBuilder) SetHistoryBuilder(hBuilder *HistoryBuilder) { e.hBuilder = hBuilder } func (e *mutableStateBuilder) GetExecutionInfo() *persistence.WorkflowExecutionInfo { return e.executionInfo } func (e *mutableStateBuilder) FlushBufferedEvents() error { // put new events into 2 buckets: // 1) if the event was added while there was in-flight decision, then put it in buffered bucket // 2) otherwise, put it in committed bucket var newBufferedEvents []*types.HistoryEvent var newCommittedEvents []*types.HistoryEvent for _, event := range e.hBuilder.history { if event.ID == common.BufferedEventID { newBufferedEvents = append(newBufferedEvents, event) } else { newCommittedEvents = append(newCommittedEvents, event) } } // Sometimes we see buffered events are out of order when read back from database. This is mostly not an issue // except in the Activity case where ActivityStarted and ActivityCompleted gets out of order. The following code // is added to reorder buffered events to guarantee all activity completion events will always be processed at the end. var reorderedEvents []*types.HistoryEvent reorderFunc := func(bufferedEvents []*types.HistoryEvent) { for _, event := range bufferedEvents { switch event.GetEventType() { case types.EventTypeActivityTaskCompleted, types.EventTypeActivityTaskFailed, types.EventTypeActivityTaskCanceled, types.EventTypeActivityTaskTimedOut: reorderedEvents = append(reorderedEvents, event) case types.EventTypeChildWorkflowExecutionCompleted, types.EventTypeChildWorkflowExecutionFailed, types.EventTypeChildWorkflowExecutionCanceled, types.EventTypeChildWorkflowExecutionTimedOut, types.EventTypeChildWorkflowExecutionTerminated: reorderedEvents = append(reorderedEvents, event) default: newCommittedEvents = append(newCommittedEvents, event) } } } // no decision in-flight, flush all buffered events to committed bucket if !e.HasInFlightDecision() { // flush persisted buffered events if len(e.bufferedEvents) > 0 { reorderFunc(e.bufferedEvents) e.bufferedEvents = nil } if e.hasBufferedEventsInDB { e.clearBufferedEvents = true } // flush pending buffered events reorderFunc(e.updateBufferedEvents) // clear pending buffered events e.updateBufferedEvents = nil // Put back all the reordered buffer events at the end if len(reorderedEvents) > 0 { newCommittedEvents = append(newCommittedEvents, reorderedEvents...) } // flush new buffered events that were not saved to persistence yet newCommittedEvents = append(newCommittedEvents, newBufferedEvents...) newBufferedEvents = nil } newCommittedEvents = e.trimEventsAfterWorkflowClose(newCommittedEvents) e.hBuilder.history = newCommittedEvents // make sure all new committed events have correct EventID e.assignEventIDToBufferedEvents() if err := e.assignTaskIDToEvents(); err != nil { return err } // if decision is not closed yet, and there are new buffered events, then put those to the pending buffer if e.HasInFlightDecision() && len(newBufferedEvents) > 0 { e.updateBufferedEvents = newBufferedEvents } return nil } func (e *mutableStateBuilder) UpdateCurrentVersion( version int64, forceUpdate bool, ) error { if state, _ := e.GetWorkflowStateCloseStatus(); state == persistence.WorkflowStateCompleted { // always set current version to last write version when workflow is completed lastWriteVersion, err := e.GetLastWriteVersion() if err != nil { return err } e.currentVersion = lastWriteVersion return nil } if e.versionHistories != nil { versionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return err } if !versionHistory.IsEmpty() { // this make sure current version >= last write version versionHistoryItem, err := versionHistory.GetLastItem() if err != nil { return err } e.currentVersion = versionHistoryItem.Version } if version > e.currentVersion || forceUpdate { e.currentVersion = version } return nil } e.currentVersion = common.EmptyVersion return nil } func (e *mutableStateBuilder) GetCurrentVersion() int64 { // TODO: remove this after all 2DC workflows complete if e.replicationState != nil { return e.replicationState.CurrentVersion } if e.versionHistories != nil { return e.currentVersion } return common.EmptyVersion } func (e *mutableStateBuilder) GetStartVersion() (int64, error) { if e.versionHistories != nil { versionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return 0, err } firstItem, err := versionHistory.GetFirstItem() if err != nil { return 0, err } return firstItem.Version, nil } return common.EmptyVersion, nil } func (e *mutableStateBuilder) GetLastWriteVersion() (int64, error) { // TODO: remove this after all 2DC workflows complete if e.replicationState != nil { return e.replicationState.LastWriteVersion, nil } if e.versionHistories != nil { versionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return 0, err } lastItem, err := versionHistory.GetLastItem() if err != nil { return 0, err } return lastItem.Version, nil } return common.EmptyVersion, nil } func (e *mutableStateBuilder) checkAndClearTimerFiredEvent( timerID string, ) *types.HistoryEvent { var timerEvent *types.HistoryEvent e.bufferedEvents, timerEvent = checkAndClearTimerFiredEvent(e.bufferedEvents, timerID) if timerEvent != nil { return timerEvent } e.updateBufferedEvents, timerEvent = checkAndClearTimerFiredEvent(e.updateBufferedEvents, timerID) if timerEvent != nil { return timerEvent } e.hBuilder.history, timerEvent = checkAndClearTimerFiredEvent(e.hBuilder.history, timerID) return timerEvent } func checkAndClearTimerFiredEvent( events []*types.HistoryEvent, timerID string, ) ([]*types.HistoryEvent, *types.HistoryEvent) { // go over all history events. if we find a timer fired event for the given // timerID, clear it timerFiredIdx := -1 for idx, event := range events { if event.GetEventType() == types.EventTypeTimerFired && event.GetTimerFiredEventAttributes().GetTimerID() == timerID { timerFiredIdx = idx break } } if timerFiredIdx == -1 { return events, nil } timerEvent := events[timerFiredIdx] return append(events[:timerFiredIdx], events[timerFiredIdx+1:]...), timerEvent } func (e *mutableStateBuilder) trimEventsAfterWorkflowClose( input []*types.HistoryEvent, ) []*types.HistoryEvent { if len(input) == 0 { return input } nextIndex := 0 loop: for _, event := range input { nextIndex++ switch event.GetEventType() { case types.EventTypeWorkflowExecutionCompleted, types.EventTypeWorkflowExecutionFailed, types.EventTypeWorkflowExecutionTimedOut, types.EventTypeWorkflowExecutionTerminated, types.EventTypeWorkflowExecutionContinuedAsNew, types.EventTypeWorkflowExecutionCanceled: break loop } } return input[0:nextIndex] } func (e *mutableStateBuilder) assignEventIDToBufferedEvents() { newCommittedEvents := e.hBuilder.history scheduledIDToStartedID := make(map[int64]int64) for _, event := range newCommittedEvents { if event.ID != common.BufferedEventID { continue } eventID := e.executionInfo.NextEventID event.ID = eventID e.executionInfo.IncreaseNextEventID() switch event.GetEventType() { case types.EventTypeActivityTaskStarted: attributes := event.ActivityTaskStartedEventAttributes scheduledID := attributes.GetScheduledEventID() scheduledIDToStartedID[scheduledID] = eventID if ai, ok := e.GetActivityInfo(scheduledID); ok { ai.StartedID = eventID e.updateActivityInfos[ai.ScheduleID] = ai } case types.EventTypeChildWorkflowExecutionStarted: attributes := event.ChildWorkflowExecutionStartedEventAttributes initiatedID := attributes.GetInitiatedEventID() scheduledIDToStartedID[initiatedID] = eventID if ci, ok := e.GetChildExecutionInfo(initiatedID); ok { ci.StartedID = eventID e.updateChildExecutionInfos[ci.InitiatedID] = ci } case types.EventTypeActivityTaskCompleted: attributes := event.ActivityTaskCompletedEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeActivityTaskFailed: attributes := event.ActivityTaskFailedEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeActivityTaskTimedOut: attributes := event.ActivityTaskTimedOutEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeActivityTaskCanceled: attributes := event.ActivityTaskCanceledEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetScheduledEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeChildWorkflowExecutionCompleted: attributes := event.ChildWorkflowExecutionCompletedEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeChildWorkflowExecutionFailed: attributes := event.ChildWorkflowExecutionFailedEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeChildWorkflowExecutionTimedOut: attributes := event.ChildWorkflowExecutionTimedOutEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeChildWorkflowExecutionCanceled: attributes := event.ChildWorkflowExecutionCanceledEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok { attributes.StartedEventID = startedID } case types.EventTypeChildWorkflowExecutionTerminated: attributes := event.ChildWorkflowExecutionTerminatedEventAttributes if startedID, ok := scheduledIDToStartedID[attributes.GetInitiatedEventID()]; ok { attributes.StartedEventID = startedID } } } } func (e *mutableStateBuilder) assignTaskIDToEvents() error { // assign task IDs to all history events // first transient events numTaskIDs := len(e.hBuilder.transientHistory) if numTaskIDs > 0 { taskIDs, err := e.shard.GenerateTransferTaskIDs(numTaskIDs) if err != nil { return err } for index, event := range e.hBuilder.transientHistory { if event.TaskID == common.EmptyEventTaskID { taskID := taskIDs[index] event.TaskID = taskID e.executionInfo.LastEventTaskID = taskID } } } // then normal events numTaskIDs = len(e.hBuilder.history) if numTaskIDs > 0 { taskIDs, err := e.shard.GenerateTransferTaskIDs(numTaskIDs) if err != nil { return err } for index, event := range e.hBuilder.history { if event.TaskID == common.EmptyEventTaskID { taskID := taskIDs[index] event.TaskID = taskID e.executionInfo.LastEventTaskID = taskID } } } return nil } func (e *mutableStateBuilder) IsCurrentWorkflowGuaranteed() bool { // stateInDB is used like a bloom filter: // // 1. stateInDB being created / running meaning that this workflow must be the current // workflow (assuming there is no rebuild of mutable state). // 2. stateInDB being completed does not guarantee this workflow being the current workflow // 3. stateInDB being zombie guarantees this workflow not being the current workflow // 4. stateInDB cannot be void, void is only possible when mutable state is just initialized switch e.stateInDB { case persistence.WorkflowStateVoid: return false case persistence.WorkflowStateCreated: return true case persistence.WorkflowStateRunning: return true case persistence.WorkflowStateCompleted: return false case persistence.WorkflowStateZombie: return false case persistence.WorkflowStateCorrupted: return false default: panic(fmt.Sprintf("unknown workflow state: %v", e.executionInfo.State)) } } func (e *mutableStateBuilder) GetDomainEntry() *cache.DomainCacheEntry { return e.domainEntry } func (e *mutableStateBuilder) IsStickyTaskListEnabled() bool { if e.executionInfo.StickyTaskList == "" { return false } ttl := e.config.StickyTTL(e.GetDomainEntry().GetInfo().Name) return !e.timeSource.Now().After(e.executionInfo.LastUpdatedTimestamp.Add(ttl)) } func (e *mutableStateBuilder) CreateNewHistoryEvent( eventType types.EventType, ) *types.HistoryEvent { return e.CreateNewHistoryEventWithTimestamp(eventType, e.timeSource.Now().UnixNano()) } func (e *mutableStateBuilder) CreateNewHistoryEventWithTimestamp( eventType types.EventType, timestamp int64, ) *types.HistoryEvent { eventID := e.executionInfo.NextEventID if e.shouldBufferEvent(eventType) { eventID = common.BufferedEventID } else { // only increase NextEventID if event is not buffered e.executionInfo.IncreaseNextEventID() } ts := common.Int64Ptr(timestamp) historyEvent := &types.HistoryEvent{} historyEvent.ID = eventID historyEvent.Timestamp = ts historyEvent.EventType = &eventType historyEvent.Version = e.GetCurrentVersion() historyEvent.TaskID = common.EmptyEventTaskID return historyEvent } func (e *mutableStateBuilder) shouldBufferEvent( eventType types.EventType, ) bool { switch eventType { case // do not buffer for workflow state change types.EventTypeWorkflowExecutionStarted, types.EventTypeWorkflowExecutionCompleted, types.EventTypeWorkflowExecutionFailed, types.EventTypeWorkflowExecutionTimedOut, types.EventTypeWorkflowExecutionTerminated, types.EventTypeWorkflowExecutionContinuedAsNew, types.EventTypeWorkflowExecutionCanceled: return false case // decision event should not be buffered types.EventTypeDecisionTaskScheduled, types.EventTypeDecisionTaskStarted, types.EventTypeDecisionTaskCompleted, types.EventTypeDecisionTaskFailed, types.EventTypeDecisionTaskTimedOut: return false case // events generated directly from decisions should not be buffered // workflow complete, failed, cancelled and continue-as-new events are duplication of above // just put is here for reference // types.EventTypeWorkflowExecutionCompleted, // types.EventTypeWorkflowExecutionFailed, // types.EventTypeWorkflowExecutionCanceled, // types.EventTypeWorkflowExecutionContinuedAsNew, types.EventTypeActivityTaskScheduled, types.EventTypeActivityTaskCancelRequested, types.EventTypeTimerStarted, // DecisionTypeCancelTimer is an exception. This decision will be mapped // to either types.EventTypeTimerCanceled, or types.EventTypeCancelTimerFailed. // So both should not be buffered. Ref: historyEngine, search for "types.DecisionTypeCancelTimer" types.EventTypeTimerCanceled, types.EventTypeCancelTimerFailed, types.EventTypeRequestCancelExternalWorkflowExecutionInitiated, types.EventTypeMarkerRecorded, types.EventTypeStartChildWorkflowExecutionInitiated, types.EventTypeSignalExternalWorkflowExecutionInitiated, types.EventTypeUpsertWorkflowSearchAttributes: // do not buffer event if event is directly generated from a corresponding decision // sanity check there is no decision on the fly if e.HasInFlightDecision() { msg := fmt.Sprintf("history mutable state is processing event: %v while there is decision pending. "+ "domainID: %v, workflow ID: %v, run ID: %v.", eventType, e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID) panic(msg) } return false default: return true } } func (e *mutableStateBuilder) GetWorkflowType() *types.WorkflowType { wType := &types.WorkflowType{} wType.Name = e.executionInfo.WorkflowTypeName return wType } func (e *mutableStateBuilder) GetQueryRegistry() query.Registry { return e.queryRegistry } func (e *mutableStateBuilder) SetQueryRegistry(queryRegistry query.Registry) { e.queryRegistry = queryRegistry } func (e *mutableStateBuilder) GetActivityScheduledEvent( ctx context.Context, scheduleEventID int64, ) (*types.HistoryEvent, error) { ai, ok := e.pendingActivityInfoIDs[scheduleEventID] if !ok { return nil, ErrMissingActivityInfo } // Needed for backward compatibility reason if ai.ScheduledEvent != nil { return ai.ScheduledEvent, nil } currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } scheduledEvent, err := e.eventsCache.GetEvent( ctx, e.shard.GetShardID(), e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, ai.ScheduledEventBatchID, ai.ScheduleID, currentBranchToken, ) if err != nil { // do not return the original error // since original error can be of type entity not exists // which can cause task processing side to fail silently // However, if the error is a persistence transient error, // we return the original error, because we fail to get // the event because of failure from database if persistence.IsTransientError(err) { return nil, err } return nil, ErrMissingActivityScheduledEvent } return scheduledEvent, nil } // GetActivityInfo gives details about an activity that is currently in progress. func (e *mutableStateBuilder) GetActivityInfo( scheduleEventID int64, ) (*persistence.ActivityInfo, bool) { ai, ok := e.pendingActivityInfoIDs[scheduleEventID] return ai, ok } // GetActivityByActivityID gives details about an activity that is currently in progress. func (e *mutableStateBuilder) GetActivityByActivityID( activityID string, ) (*persistence.ActivityInfo, bool) { eventID, ok := e.pendingActivityIDToEventID[activityID] if !ok { return nil, false } return e.GetActivityInfo(eventID) } // GetChildExecutionInfo gives details about a child execution that is currently in progress. func (e *mutableStateBuilder) GetChildExecutionInfo( initiatedEventID int64, ) (*persistence.ChildExecutionInfo, bool) { ci, ok := e.pendingChildExecutionInfoIDs[initiatedEventID] return ci, ok } // GetChildExecutionInitiatedEvent reads out the ChildExecutionInitiatedEvent from mutable state for in-progress child // executions func (e *mutableStateBuilder) GetChildExecutionInitiatedEvent( ctx context.Context, initiatedEventID int64, ) (*types.HistoryEvent, error) { ci, ok := e.pendingChildExecutionInfoIDs[initiatedEventID] if !ok { return nil, ErrMissingChildWorkflowInfo } // Needed for backward compatibility reason if ci.InitiatedEvent != nil { return ci.InitiatedEvent, nil } currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } initiatedEvent, err := e.eventsCache.GetEvent( ctx, e.shard.GetShardID(), e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, ci.InitiatedEventBatchID, ci.InitiatedID, currentBranchToken, ) if err != nil { // do not return the original error // since original error can be of type entity not exists // which can cause task processing side to fail silently // However, if the error is a persistence transient error, // we return the original error, because we fail to get // the event because of failure from database if persistence.IsTransientError(err) { return nil, err } return nil, ErrMissingChildWorkflowInitiatedEvent } return initiatedEvent, nil } // GetRequestCancelInfo gives details about a request cancellation that is currently in progress. func (e *mutableStateBuilder) GetRequestCancelInfo( initiatedEventID int64, ) (*persistence.RequestCancelInfo, bool) { ri, ok := e.pendingRequestCancelInfoIDs[initiatedEventID] return ri, ok } func (e *mutableStateBuilder) GetRetryBackoffDuration( errReason string, ) time.Duration { info := e.executionInfo if !info.HasRetryPolicy { return backoff.NoBackoff } return getBackoffInterval( e.timeSource.Now(), info.ExpirationTime, info.Attempt, info.MaximumAttempts, info.InitialInterval, info.MaximumInterval, info.BackoffCoefficient, errReason, info.NonRetriableErrors, ) } func (e *mutableStateBuilder) GetCronBackoffDuration( ctx context.Context, ) (time.Duration, error) { info := e.executionInfo if len(info.CronSchedule) == 0 { return backoff.NoBackoff, nil } sched, err := backoff.ValidateSchedule(info.CronSchedule) if err != nil { return backoff.NoBackoff, err } // TODO: decide if we can add execution time in execution info. executionTime := e.executionInfo.StartTimestamp // This only call when doing ContinueAsNew. At this point, the workflow should have a start event workflowStartEvent, err := e.GetStartEvent(ctx) if err != nil { e.logError("unable to find workflow start event", tag.ErrorTypeInvalidHistoryAction) return backoff.NoBackoff, err } firstDecisionTaskBackoff := time.Duration(workflowStartEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second executionTime = executionTime.Add(firstDecisionTaskBackoff) jitterStartSeconds := workflowStartEvent.GetWorkflowExecutionStartedEventAttributes().GetJitterStartSeconds() return backoff.GetBackoffForNextSchedule(sched, executionTime, e.timeSource.Now(), jitterStartSeconds) } // GetSignalInfo get details about a signal request that is currently in progress. func (e *mutableStateBuilder) GetSignalInfo( initiatedEventID int64, ) (*persistence.SignalInfo, bool) { ri, ok := e.pendingSignalInfoIDs[initiatedEventID] return ri, ok } // GetCompletionEvent retrieves the workflow completion event from mutable state func (e *mutableStateBuilder) GetCompletionEvent( ctx context.Context, ) (*types.HistoryEvent, error) { if e.executionInfo.State != persistence.WorkflowStateCompleted { return nil, ErrMissingWorkflowCompletionEvent } // Needed for backward compatibility reason if e.executionInfo.CompletionEvent != nil { return e.executionInfo.CompletionEvent, nil } // Needed for backward compatibility reason if e.executionInfo.CompletionEventBatchID == common.EmptyEventID { return nil, ErrMissingWorkflowCompletionEvent } currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } // Completion EventID is always one less than NextEventID after workflow is completed completionEventID := e.executionInfo.NextEventID - 1 firstEventID := e.executionInfo.CompletionEventBatchID completionEvent, err := e.eventsCache.GetEvent( ctx, e.shard.GetShardID(), e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, firstEventID, completionEventID, currentBranchToken, ) if err != nil { // do not return the original error // since original error can be of type entity not exists // which can cause task processing side to fail silently // However, if the error is a persistence transient error, // we return the original error, because we fail to get // the event because of failure from database if persistence.IsTransientError(err) { return nil, err } return nil, ErrMissingWorkflowCompletionEvent } return completionEvent, nil } // GetStartEvent retrieves the workflow start event from mutable state func (e *mutableStateBuilder) GetStartEvent( ctx context.Context, ) (*types.HistoryEvent, error) { currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } startEvent, err := e.eventsCache.GetEvent( ctx, e.shard.GetShardID(), e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, common.FirstEventID, common.FirstEventID, currentBranchToken, ) if err != nil { // do not return the original error // since original error can be of type entity not exists // which can cause task processing side to fail silently // However, if the error is a persistence transient error, // we return the original error, because we fail to get // the event because of failure from database if persistence.IsTransientError(err) { return nil, err } return nil, ErrMissingWorkflowStartEvent } return startEvent, nil } // DeletePendingChildExecution deletes details about a ChildExecutionInfo. func (e *mutableStateBuilder) DeletePendingChildExecution( initiatedEventID int64, ) error { if _, ok := e.pendingChildExecutionInfoIDs[initiatedEventID]; ok { delete(e.pendingChildExecutionInfoIDs, initiatedEventID) } else { e.logError( fmt.Sprintf("unable to find child workflow event ID: %v in mutable state", initiatedEventID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } delete(e.updateChildExecutionInfos, initiatedEventID) e.deleteChildExecutionInfos[initiatedEventID] = struct{}{} return nil } // DeletePendingRequestCancel deletes details about a RequestCancelInfo. func (e *mutableStateBuilder) DeletePendingRequestCancel( initiatedEventID int64, ) error { if _, ok := e.pendingRequestCancelInfoIDs[initiatedEventID]; ok { delete(e.pendingRequestCancelInfoIDs, initiatedEventID) } else { e.logError( fmt.Sprintf("unable to find request cancel external workflow event ID: %v in mutable state", initiatedEventID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } delete(e.updateRequestCancelInfos, initiatedEventID) e.deleteRequestCancelInfos[initiatedEventID] = struct{}{} return nil } // DeletePendingSignal deletes details about a SignalInfo func (e *mutableStateBuilder) DeletePendingSignal( initiatedEventID int64, ) error { if _, ok := e.pendingSignalInfoIDs[initiatedEventID]; ok { delete(e.pendingSignalInfoIDs, initiatedEventID) } else { e.logError( fmt.Sprintf("unable to find signal external workflow event ID: %v in mutable state", initiatedEventID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } delete(e.updateSignalInfos, initiatedEventID) e.deleteSignalInfos[initiatedEventID] = struct{}{} return nil } func (e *mutableStateBuilder) writeEventToCache( event *types.HistoryEvent, ) { // For start event: store it within events cache so the recordWorkflowStarted transfer task doesn't need to // load it from database // For completion event: store it within events cache so we can communicate the result to parent execution // during the processing of DeleteTransferTask without loading this event from database e.eventsCache.PutEvent( e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, event.ID, event, ) } func (e *mutableStateBuilder) HasParentExecution() bool { return e.executionInfo.ParentDomainID != "" && e.executionInfo.ParentWorkflowID != "" } func (e *mutableStateBuilder) UpdateActivityProgress( ai *persistence.ActivityInfo, request *types.RecordActivityTaskHeartbeatRequest, ) { ai.Version = e.GetCurrentVersion() ai.Details = request.Details ai.LastHeartBeatUpdatedTime = e.timeSource.Now() e.updateActivityInfos[ai.ScheduleID] = ai e.syncActivityTasks[ai.ScheduleID] = struct{}{} } // ReplicateActivityInfo replicate the necessary activity information func (e *mutableStateBuilder) ReplicateActivityInfo( request *types.SyncActivityRequest, resetActivityTimerTaskStatus bool, ) error { ai, ok := e.pendingActivityInfoIDs[request.GetScheduledID()] if !ok { e.logError( fmt.Sprintf("unable to find activity event ID: %v in mutable state", request.GetScheduledID()), tag.ErrorTypeInvalidMutableStateAction, ) return ErrMissingActivityInfo } ai.Version = request.GetVersion() ai.ScheduledTime = time.Unix(0, request.GetScheduledTime()) ai.StartedID = request.GetStartedID() ai.LastHeartBeatUpdatedTime = time.Unix(0, request.GetLastHeartbeatTime()) if ai.StartedID == common.EmptyEventID { ai.StartedTime = time.Time{} } else { ai.StartedTime = time.Unix(0, request.GetStartedTime()) } ai.Details = request.GetDetails() ai.Attempt = request.GetAttempt() ai.LastFailureReason = request.GetLastFailureReason() ai.LastWorkerIdentity = request.GetLastWorkerIdentity() ai.LastFailureDetails = request.GetLastFailureDetails() if resetActivityTimerTaskStatus { ai.TimerTaskStatus = TimerTaskStatusNone } e.updateActivityInfos[ai.ScheduleID] = ai return nil } // UpdateActivity updates an activity func (e *mutableStateBuilder) UpdateActivity( ai *persistence.ActivityInfo, ) error { if _, ok := e.pendingActivityInfoIDs[ai.ScheduleID]; !ok { e.logError( fmt.Sprintf("unable to find activity ID: %v in mutable state", ai.ActivityID), tag.ErrorTypeInvalidMutableStateAction, ) return ErrMissingActivityInfo } e.pendingActivityInfoIDs[ai.ScheduleID] = ai e.updateActivityInfos[ai.ScheduleID] = ai return nil } // DeleteActivity deletes details about an activity. func (e *mutableStateBuilder) DeleteActivity( scheduleEventID int64, ) error { if activityInfo, ok := e.pendingActivityInfoIDs[scheduleEventID]; ok { delete(e.pendingActivityInfoIDs, scheduleEventID) if _, ok = e.pendingActivityIDToEventID[activityInfo.ActivityID]; ok { delete(e.pendingActivityIDToEventID, activityInfo.ActivityID) } else { e.logError( fmt.Sprintf("unable to find activity ID: %v in mutable state", activityInfo.ActivityID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } } else { e.logError( fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduleEventID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } delete(e.updateActivityInfos, scheduleEventID) e.deleteActivityInfos[scheduleEventID] = struct{}{} return nil } // GetUserTimerInfo gives details about a user timer. func (e *mutableStateBuilder) GetUserTimerInfo( timerID string, ) (*persistence.TimerInfo, bool) { timerInfo, ok := e.pendingTimerInfoIDs[timerID] return timerInfo, ok } // GetUserTimerInfoByEventID gives details about a user timer. func (e *mutableStateBuilder) GetUserTimerInfoByEventID( startEventID int64, ) (*persistence.TimerInfo, bool) { timerID, ok := e.pendingTimerEventIDToID[startEventID] if !ok { return nil, false } return e.GetUserTimerInfo(timerID) } // UpdateUserTimer updates the user timer in progress. func (e *mutableStateBuilder) UpdateUserTimer( ti *persistence.TimerInfo, ) error { timerID, ok := e.pendingTimerEventIDToID[ti.StartedID] if !ok { e.logError( fmt.Sprintf("unable to find timer event ID: %v in mutable state", ti.StartedID), tag.ErrorTypeInvalidMutableStateAction, ) return ErrMissingTimerInfo } if _, ok := e.pendingTimerInfoIDs[timerID]; !ok { e.logError( fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID), tag.ErrorTypeInvalidMutableStateAction, ) return ErrMissingTimerInfo } e.pendingTimerInfoIDs[ti.TimerID] = ti e.updateTimerInfos[ti.TimerID] = ti return nil } // DeleteUserTimer deletes an user timer. func (e *mutableStateBuilder) DeleteUserTimer( timerID string, ) error { if timerInfo, ok := e.pendingTimerInfoIDs[timerID]; ok { delete(e.pendingTimerInfoIDs, timerID) if _, ok = e.pendingTimerEventIDToID[timerInfo.StartedID]; ok { delete(e.pendingTimerEventIDToID, timerInfo.StartedID) } else { e.logError( fmt.Sprintf("unable to find timer event ID: %v in mutable state", timerID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } } else { e.logError( fmt.Sprintf("unable to find timer ID: %v in mutable state", timerID), tag.ErrorTypeInvalidMutableStateAction, ) // log data inconsistency instead of returning an error e.logDataInconsistency() } delete(e.updateTimerInfos, timerID) e.deleteTimerInfos[timerID] = struct{}{} return nil } // GetDecisionInfo returns details about the in-progress decision task func (e *mutableStateBuilder) GetDecisionInfo( scheduleEventID int64, ) (*DecisionInfo, bool) { return e.decisionTaskManager.GetDecisionInfo(scheduleEventID) } func (e *mutableStateBuilder) GetDecisionScheduleToStartTimeout() time.Duration { return e.decisionTaskManager.GetDecisionScheduleToStartTimeout() } func (e *mutableStateBuilder) GetPendingActivityInfos() map[int64]*persistence.ActivityInfo { return e.pendingActivityInfoIDs } func (e *mutableStateBuilder) GetPendingTimerInfos() map[string]*persistence.TimerInfo { return e.pendingTimerInfoIDs } func (e *mutableStateBuilder) GetPendingChildExecutionInfos() map[int64]*persistence.ChildExecutionInfo { return e.pendingChildExecutionInfoIDs } func (e *mutableStateBuilder) GetPendingRequestCancelExternalInfos() map[int64]*persistence.RequestCancelInfo { return e.pendingRequestCancelInfoIDs } func (e *mutableStateBuilder) GetPendingSignalExternalInfos() map[int64]*persistence.SignalInfo { return e.pendingSignalInfoIDs } func (e *mutableStateBuilder) HasProcessedOrPendingDecision() bool { return e.decisionTaskManager.HasProcessedOrPendingDecision() } func (e *mutableStateBuilder) HasPendingDecision() bool { return e.decisionTaskManager.HasPendingDecision() } func (e *mutableStateBuilder) GetPendingDecision() (*DecisionInfo, bool) { return e.decisionTaskManager.GetPendingDecision() } func (e *mutableStateBuilder) HasInFlightDecision() bool { return e.decisionTaskManager.HasInFlightDecision() } func (e *mutableStateBuilder) GetInFlightDecision() (*DecisionInfo, bool) { return e.decisionTaskManager.GetInFlightDecision() } func (e *mutableStateBuilder) HasBufferedEvents() bool { if len(e.bufferedEvents) > 0 || len(e.updateBufferedEvents) > 0 { return true } for _, event := range e.hBuilder.history { if event.ID == common.BufferedEventID { return true } } return false } // UpdateDecision updates a decision task. func (e *mutableStateBuilder) UpdateDecision( decision *DecisionInfo, ) { e.decisionTaskManager.UpdateDecision(decision) } // DeleteDecision deletes a decision task. func (e *mutableStateBuilder) DeleteDecision() { e.decisionTaskManager.DeleteDecision() } func (e *mutableStateBuilder) FailDecision( incrementAttempt bool, ) { e.decisionTaskManager.FailDecision(incrementAttempt) } func (e *mutableStateBuilder) ClearStickyness() { e.executionInfo.StickyTaskList = "" e.executionInfo.StickyScheduleToStartTimeout = 0 e.executionInfo.ClientLibraryVersion = "" e.executionInfo.ClientFeatureVersion = "" e.executionInfo.ClientImpl = "" } // GetLastFirstEventID returns last first event ID // first event ID is the ID of a batch of events in a single history events record func (e *mutableStateBuilder) GetLastFirstEventID() int64 { return e.executionInfo.LastFirstEventID } // GetNextEventID returns next event ID func (e *mutableStateBuilder) GetNextEventID() int64 { return e.executionInfo.NextEventID } // GetPreviousStartedEventID returns last started decision task event ID func (e *mutableStateBuilder) GetPreviousStartedEventID() int64 { return e.executionInfo.LastProcessedEvent } func (e *mutableStateBuilder) IsWorkflowExecutionRunning() bool { return e.executionInfo.IsRunning() } func (e *mutableStateBuilder) IsWorkflowCompleted() bool { return e.executionInfo.State == persistence.WorkflowStateCompleted } func (e *mutableStateBuilder) IsCancelRequested() (bool, string) { if e.executionInfo.CancelRequested { return e.executionInfo.CancelRequested, e.executionInfo.CancelRequestID } return false, "" } func (e *mutableStateBuilder) IsSignalRequested( requestID string, ) bool { if _, ok := e.pendingSignalRequestedIDs[requestID]; ok { return true } return false } func (e *mutableStateBuilder) AddSignalRequested( requestID string, ) { if e.pendingSignalRequestedIDs == nil { e.pendingSignalRequestedIDs = make(map[string]struct{}) } if e.updateSignalRequestedIDs == nil { e.updateSignalRequestedIDs = make(map[string]struct{}) } e.pendingSignalRequestedIDs[requestID] = struct{}{} // add requestID to set e.updateSignalRequestedIDs[requestID] = struct{}{} } func (e *mutableStateBuilder) DeleteSignalRequested( requestID string, ) { delete(e.pendingSignalRequestedIDs, requestID) delete(e.updateSignalRequestedIDs, requestID) e.deleteSignalRequestedIDs[requestID] = struct{}{} } func (e *mutableStateBuilder) addWorkflowExecutionStartedEventForContinueAsNew( parentExecutionInfo *types.ParentExecutionInfo, execution types.WorkflowExecution, previousExecutionState MutableState, attributes *types.ContinueAsNewWorkflowExecutionDecisionAttributes, firstRunID string, firstScheduledTime time.Time, ) (*types.HistoryEvent, error) { previousExecutionInfo := previousExecutionState.GetExecutionInfo() taskList := previousExecutionInfo.TaskList if attributes.TaskList != nil { taskList = attributes.TaskList.GetName() } tl := &types.TaskList{} tl.Name = taskList workflowType := previousExecutionInfo.WorkflowTypeName if attributes.WorkflowType != nil { workflowType = attributes.WorkflowType.GetName() } wType := &types.WorkflowType{} wType.Name = workflowType decisionTimeout := previousExecutionInfo.DecisionStartToCloseTimeout if attributes.TaskStartToCloseTimeoutSeconds != nil { decisionTimeout = attributes.GetTaskStartToCloseTimeoutSeconds() } createRequest := &types.StartWorkflowExecutionRequest{ RequestID: uuid.New(), Domain: e.domainEntry.GetInfo().Name, WorkflowID: execution.WorkflowID, TaskList: tl, WorkflowType: wType, TaskStartToCloseTimeoutSeconds: common.Int32Ptr(decisionTimeout), ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds, Input: attributes.Input, Header: attributes.Header, RetryPolicy: attributes.RetryPolicy, CronSchedule: attributes.CronSchedule, Memo: attributes.Memo, SearchAttributes: attributes.SearchAttributes, JitterStartSeconds: attributes.JitterStartSeconds, } req := &types.HistoryStartWorkflowExecutionRequest{ DomainUUID: e.domainEntry.GetInfo().ID, StartRequest: createRequest, ParentExecutionInfo: parentExecutionInfo, LastCompletionResult: attributes.LastCompletionResult, ContinuedFailureReason: attributes.FailureReason, ContinuedFailureDetails: attributes.FailureDetails, ContinueAsNewInitiator: attributes.Initiator, FirstDecisionTaskBackoffSeconds: attributes.BackoffStartIntervalInSeconds, PartitionConfig: previousExecutionInfo.PartitionConfig, } // if ContinueAsNew as Cron or decider, recalculate the expiration timestamp and set attempts to 0 req.Attempt = 0 if attributes.RetryPolicy != nil && attributes.RetryPolicy.GetExpirationIntervalInSeconds() > 0 { // expirationTime calculates from first decision task schedule to the end of the workflow expirationInSeconds := attributes.RetryPolicy.GetExpirationIntervalInSeconds() + req.GetFirstDecisionTaskBackoffSeconds() expirationTime := e.timeSource.Now().Add(time.Second * time.Duration(expirationInSeconds)) req.ExpirationTimestamp = common.Int64Ptr(expirationTime.UnixNano()) } // if ContinueAsNew as retry use the same expiration timestamp and increment attempts from previous execution state if attributes.GetInitiator() == types.ContinueAsNewInitiatorRetryPolicy { req.Attempt = previousExecutionState.GetExecutionInfo().Attempt + 1 expirationTime := previousExecutionState.GetExecutionInfo().ExpirationTime if !expirationTime.IsZero() { req.ExpirationTimestamp = common.Int64Ptr(expirationTime.UnixNano()) } } // History event only has domainName so domainID has to be passed in explicitly to update the mutable state var parentDomainID *string if parentExecutionInfo != nil { parentDomainID = &parentExecutionInfo.DomainUUID } event := e.hBuilder.AddWorkflowExecutionStartedEvent(req, previousExecutionInfo, firstRunID, execution.GetRunID(), firstScheduledTime) if err := e.ReplicateWorkflowExecutionStartedEvent( parentDomainID, execution, createRequest.GetRequestID(), event, false, ); err != nil { return nil, err } if err := e.SetHistoryTree(e.GetExecutionInfo().RunID); err != nil { return nil, err } if err := e.AddFirstDecisionTaskScheduled( event, ); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) AddWorkflowExecutionStartedEvent( execution types.WorkflowExecution, startRequest *types.HistoryStartWorkflowExecutionRequest, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowStarted if err := e.checkMutability(opTag); err != nil { return nil, err } request := startRequest.StartRequest eventID := e.GetNextEventID() if eventID != common.FirstEventID { e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(eventID), tag.ErrorTypeInvalidHistoryAction) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddWorkflowExecutionStartedEvent(startRequest, nil, execution.GetRunID(), execution.GetRunID(), time.Now()) var parentDomainID *string if startRequest.ParentExecutionInfo != nil { parentDomainID = &startRequest.ParentExecutionInfo.DomainUUID } if err := e.ReplicateWorkflowExecutionStartedEvent( parentDomainID, execution, request.GetRequestID(), event, false); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionStartedEvent( parentDomainID *string, execution types.WorkflowExecution, requestID string, startEvent *types.HistoryEvent, generateDelayedDecisionTasks bool, ) error { event := startEvent.WorkflowExecutionStartedEventAttributes e.executionInfo.CreateRequestID = requestID e.executionInfo.DomainID = e.domainEntry.GetInfo().ID e.executionInfo.WorkflowID = execution.GetWorkflowID() e.executionInfo.RunID = execution.GetRunID() e.executionInfo.FirstExecutionRunID = event.GetFirstExecutionRunID() if e.executionInfo.FirstExecutionRunID == "" { e.executionInfo.FirstExecutionRunID = execution.GetRunID() } e.executionInfo.TaskList = event.TaskList.GetName() e.executionInfo.WorkflowTypeName = event.WorkflowType.GetName() e.executionInfo.WorkflowTimeout = event.GetExecutionStartToCloseTimeoutSeconds() e.executionInfo.DecisionStartToCloseTimeout = event.GetTaskStartToCloseTimeoutSeconds() e.executionInfo.StartTimestamp = e.unixNanoToTime(startEvent.GetTimestamp()) if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCreated, persistence.WorkflowCloseStatusNone, ); err != nil { return err } e.executionInfo.LastProcessedEvent = common.EmptyEventID e.executionInfo.LastFirstEventID = startEvent.ID e.executionInfo.DecisionVersion = common.EmptyVersion e.executionInfo.DecisionScheduleID = common.EmptyEventID e.executionInfo.DecisionStartedID = common.EmptyEventID e.executionInfo.DecisionRequestID = common.EmptyUUID e.executionInfo.DecisionTimeout = 0 e.executionInfo.CronSchedule = event.GetCronSchedule() if parentDomainID != nil { e.executionInfo.ParentDomainID = *parentDomainID } if event.ParentWorkflowExecution != nil { e.executionInfo.ParentWorkflowID = event.ParentWorkflowExecution.GetWorkflowID() e.executionInfo.ParentRunID = event.ParentWorkflowExecution.GetRunID() } if event.ParentInitiatedEventID != nil { e.executionInfo.InitiatedID = event.GetParentInitiatedEventID() } else { e.executionInfo.InitiatedID = common.EmptyEventID } e.executionInfo.Attempt = event.GetAttempt() if event.GetExpirationTimestamp() != 0 { e.executionInfo.ExpirationTime = time.Unix(0, event.GetExpirationTimestamp()) } if event.RetryPolicy != nil { e.executionInfo.HasRetryPolicy = true e.executionInfo.BackoffCoefficient = event.RetryPolicy.GetBackoffCoefficient() e.executionInfo.ExpirationSeconds = event.RetryPolicy.GetExpirationIntervalInSeconds() e.executionInfo.InitialInterval = event.RetryPolicy.GetInitialIntervalInSeconds() e.executionInfo.MaximumAttempts = event.RetryPolicy.GetMaximumAttempts() e.executionInfo.MaximumInterval = event.RetryPolicy.GetMaximumIntervalInSeconds() e.executionInfo.NonRetriableErrors = event.RetryPolicy.NonRetriableErrorReasons } e.executionInfo.AutoResetPoints = rolloverAutoResetPointsWithExpiringTime( event.GetPrevAutoResetPoints(), event.GetContinuedExecutionRunID(), startEvent.GetTimestamp(), e.domainEntry.GetRetentionDays(e.executionInfo.WorkflowID), ) if event.Memo != nil { e.executionInfo.Memo = event.Memo.GetFields() } if event.SearchAttributes != nil { e.executionInfo.SearchAttributes = event.SearchAttributes.GetIndexedFields() } e.executionInfo.PartitionConfig = event.PartitionConfig e.writeEventToCache(startEvent) if err := e.taskGenerator.GenerateWorkflowStartTasks(e.unixNanoToTime(startEvent.GetTimestamp()), startEvent); err != nil { return err } if err := e.taskGenerator.GenerateRecordWorkflowStartedTasks(startEvent); err != nil { return err } if generateDelayedDecisionTasks && event.GetFirstDecisionTaskBackoffSeconds() > 0 { if err := e.taskGenerator.GenerateDelayedDecisionTasks(startEvent); err != nil { return err } } return nil } func (e *mutableStateBuilder) AddFirstDecisionTaskScheduled( startEvent *types.HistoryEvent, ) error { opTag := tag.WorkflowActionDecisionTaskScheduled if err := e.checkMutability(opTag); err != nil { return err } return e.decisionTaskManager.AddFirstDecisionTaskScheduled(startEvent) } func (e *mutableStateBuilder) AddDecisionTaskScheduledEvent( bypassTaskGeneration bool, ) (*DecisionInfo, error) { opTag := tag.WorkflowActionDecisionTaskScheduled if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskScheduledEvent(bypassTaskGeneration) } // originalScheduledTimestamp is to record the first scheduled decision during decision heartbeat. func (e *mutableStateBuilder) AddDecisionTaskScheduledEventAsHeartbeat( bypassTaskGeneration bool, originalScheduledTimestamp int64, ) (*DecisionInfo, error) { opTag := tag.WorkflowActionDecisionTaskScheduled if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskScheduledEventAsHeartbeat(bypassTaskGeneration, originalScheduledTimestamp) } func (e *mutableStateBuilder) ReplicateTransientDecisionTaskScheduled() error { return e.decisionTaskManager.ReplicateTransientDecisionTaskScheduled() } func (e *mutableStateBuilder) ReplicateDecisionTaskScheduledEvent( version int64, scheduleID int64, taskList string, startToCloseTimeoutSeconds int32, attempt int64, scheduleTimestamp int64, originalScheduledTimestamp int64, bypassTaskGeneration bool, ) (*DecisionInfo, error) { return e.decisionTaskManager.ReplicateDecisionTaskScheduledEvent(version, scheduleID, taskList, startToCloseTimeoutSeconds, attempt, scheduleTimestamp, originalScheduledTimestamp, bypassTaskGeneration) } func (e *mutableStateBuilder) AddDecisionTaskStartedEvent( scheduleEventID int64, requestID string, request *types.PollForDecisionTaskRequest, ) (*types.HistoryEvent, *DecisionInfo, error) { opTag := tag.WorkflowActionDecisionTaskStarted if err := e.checkMutability(opTag); err != nil { return nil, nil, err } return e.decisionTaskManager.AddDecisionTaskStartedEvent(scheduleEventID, requestID, request) } func (e *mutableStateBuilder) ReplicateDecisionTaskStartedEvent( decision *DecisionInfo, version int64, scheduleID int64, startedID int64, requestID string, timestamp int64, ) (*DecisionInfo, error) { return e.decisionTaskManager.ReplicateDecisionTaskStartedEvent(decision, version, scheduleID, startedID, requestID, timestamp) } func (e *mutableStateBuilder) CreateTransientDecisionEvents( decision *DecisionInfo, identity string, ) (*types.HistoryEvent, *types.HistoryEvent) { return e.decisionTaskManager.CreateTransientDecisionEvents(decision, identity) } // add BinaryCheckSum for the first decisionTaskCompletedID for auto-reset func (e *mutableStateBuilder) addBinaryCheckSumIfNotExists( event *types.HistoryEvent, maxResetPoints int, ) error { binChecksum := event.GetDecisionTaskCompletedEventAttributes().GetBinaryChecksum() if len(binChecksum) == 0 { return nil } exeInfo := e.executionInfo var currResetPoints []*types.ResetPointInfo if exeInfo.AutoResetPoints != nil && exeInfo.AutoResetPoints.Points != nil { currResetPoints = e.executionInfo.AutoResetPoints.Points } else { currResetPoints = make([]*types.ResetPointInfo, 0, 1) } // List of all recent binary checksums associated with the types. var recentBinaryChecksums []string for _, rp := range currResetPoints { recentBinaryChecksums = append(recentBinaryChecksums, rp.GetBinaryChecksum()) if rp.GetBinaryChecksum() == binChecksum { // this checksum already exists return nil } } recentBinaryChecksums, currResetPoints = trimBinaryChecksums(recentBinaryChecksums, currResetPoints, maxResetPoints) // Adding current version of the binary checksum. recentBinaryChecksums = append(recentBinaryChecksums, binChecksum) resettable := true err := e.CheckResettable() if err != nil { resettable = false } info := &types.ResetPointInfo{ BinaryChecksum: binChecksum, RunID: exeInfo.RunID, FirstDecisionCompletedID: event.ID, CreatedTimeNano: common.Int64Ptr(e.timeSource.Now().UnixNano()), Resettable: resettable, } currResetPoints = append(currResetPoints, info) exeInfo.AutoResetPoints = &types.ResetPoints{ Points: currResetPoints, } bytes, err := json.Marshal(recentBinaryChecksums) if err != nil { return err } if exeInfo.SearchAttributes == nil { exeInfo.SearchAttributes = make(map[string][]byte) } exeInfo.SearchAttributes[definition.BinaryChecksums] = bytes if common.IsAdvancedVisibilityWritingEnabled(e.shard.GetConfig().AdvancedVisibilityWritingMode(), e.shard.GetConfig().IsAdvancedVisConfigExist) { return e.taskGenerator.GenerateWorkflowSearchAttrTasks() } return nil } // TODO: we will release the restriction when reset API allow those pending func (e *mutableStateBuilder) CheckResettable() error { if len(e.GetPendingChildExecutionInfos()) > 0 { return &types.BadRequestError{ Message: "it is not allowed resetting to a point that workflow has pending child types.", } } if len(e.GetPendingRequestCancelExternalInfos()) > 0 { return &types.BadRequestError{ Message: "it is not allowed resetting to a point that workflow has pending request cancel.", } } if len(e.GetPendingSignalExternalInfos()) > 0 { return &types.BadRequestError{ Message: "it is not allowed resetting to a point that workflow has pending signals to send.", } } return nil } func (e *mutableStateBuilder) AddDecisionTaskCompletedEvent( scheduleEventID int64, startedEventID int64, request *types.RespondDecisionTaskCompletedRequest, maxResetPoints int, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskCompleted if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskCompletedEvent(scheduleEventID, startedEventID, request, maxResetPoints) } func (e *mutableStateBuilder) ReplicateDecisionTaskCompletedEvent( event *types.HistoryEvent, ) error { return e.decisionTaskManager.ReplicateDecisionTaskCompletedEvent(event) } func (e *mutableStateBuilder) AddDecisionTaskTimedOutEvent( scheduleEventID int64, startedEventID int64, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskTimedOutEvent(scheduleEventID, startedEventID) } func (e *mutableStateBuilder) ReplicateDecisionTaskTimedOutEvent( timeoutType types.TimeoutType, ) error { return e.decisionTaskManager.ReplicateDecisionTaskTimedOutEvent(timeoutType) } func (e *mutableStateBuilder) AddDecisionTaskScheduleToStartTimeoutEvent( scheduleEventID int64, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskScheduleToStartTimeoutEvent(scheduleEventID) } func (e *mutableStateBuilder) AddDecisionTaskResetTimeoutEvent( scheduleEventID int64, baseRunID string, newRunID string, forkEventVersion int64, reason string, resetRequestID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskTimedOut if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskResetTimeoutEvent( scheduleEventID, baseRunID, newRunID, forkEventVersion, reason, resetRequestID, ) } func (e *mutableStateBuilder) AddDecisionTaskFailedEvent( scheduleEventID int64, startedEventID int64, cause types.DecisionTaskFailedCause, details []byte, identity string, reason string, binChecksum string, baseRunID string, newRunID string, forkEventVersion int64, resetRequestID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionDecisionTaskFailed if err := e.checkMutability(opTag); err != nil { return nil, err } return e.decisionTaskManager.AddDecisionTaskFailedEvent( scheduleEventID, startedEventID, cause, details, identity, reason, binChecksum, baseRunID, newRunID, forkEventVersion, resetRequestID, ) } func (e *mutableStateBuilder) ReplicateDecisionTaskFailedEvent() error { return e.decisionTaskManager.ReplicateDecisionTaskFailedEvent() } func (e *mutableStateBuilder) AddActivityTaskScheduledEvent( ctx context.Context, decisionCompletedEventID int64, attributes *types.ScheduleActivityTaskDecisionAttributes, dispatch bool, ) (*types.HistoryEvent, *persistence.ActivityInfo, *types.ActivityLocalDispatchInfo, bool, bool, error) { opTag := tag.WorkflowActionActivityTaskScheduled if err := e.checkMutability(opTag); err != nil { return nil, nil, nil, false, false, err } _, ok := e.GetActivityByActivityID(attributes.GetActivityID()) if ok { e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction) return nil, nil, nil, false, false, e.createCallerError(opTag) } pendingActivitiesCount := len(e.pendingActivityInfoIDs) if pendingActivitiesCount >= e.config.PendingActivitiesCountLimitError() { e.logger.Error("Pending activity count exceeds error limit", tag.WorkflowDomainName(e.GetDomainEntry().GetInfo().Name), tag.WorkflowID(e.executionInfo.WorkflowID), tag.WorkflowRunID(e.executionInfo.RunID), tag.Number(int64(pendingActivitiesCount))) if e.config.PendingActivityValidationEnabled() { return nil, nil, nil, false, false, ErrTooManyPendingActivities } } else if pendingActivitiesCount >= e.config.PendingActivitiesCountLimitWarn() && !e.pendingActivityWarningSent { e.logger.Warn("Pending activity count exceeds warn limit", tag.WorkflowDomainName(e.GetDomainEntry().GetInfo().Name), tag.WorkflowID(e.executionInfo.WorkflowID), tag.WorkflowRunID(e.executionInfo.RunID), tag.Number(int64(pendingActivitiesCount))) e.pendingActivityWarningSent = true } event := e.hBuilder.AddActivityTaskScheduledEvent(decisionCompletedEventID, attributes) // Write the event to cache only on active cluster for processing on activity started or retried e.eventsCache.PutEvent( e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, event.ID, event, ) ai, err := e.ReplicateActivityTaskScheduledEvent(decisionCompletedEventID, event, true) if err != nil { return nil, nil, nil, false, false, err } activityStartedScope := e.metricsClient.Scope(metrics.HistoryRecordActivityTaskStartedScope) if e.config.EnableActivityLocalDispatchByDomain(e.domainEntry.GetInfo().Name) && attributes.RequestLocalDispatch { activityStartedScope.IncCounter(metrics.CadenceRequests) return event, ai, &types.ActivityLocalDispatchInfo{ActivityID: ai.ActivityID}, false, false, nil } started := false if dispatch { started = e.tryDispatchActivityTask(ctx, event, ai) } if started { activityStartedScope.IncCounter(metrics.CadenceRequests) return event, ai, nil, true, true, nil } if err := e.taskGenerator.GenerateActivityTransferTasks(event); err != nil { return nil, nil, nil, dispatch, false, err } return event, ai, nil, dispatch, false, err } func (e *mutableStateBuilder) tryDispatchActivityTask( ctx context.Context, scheduledEvent *types.HistoryEvent, ai *persistence.ActivityInfo, ) bool { taggedScope := e.metricsClient.Scope(metrics.HistoryScheduleDecisionTaskScope).Tagged( metrics.DomainTag(e.domainEntry.GetInfo().Name), metrics.WorkflowTypeTag(e.GetWorkflowType().Name), metrics.TaskListTag(ai.TaskList)) taggedScope.IncCounter(metrics.DecisionTypeScheduleActivityDispatchCounter) err := e.shard.GetService().GetMatchingClient().AddActivityTask(ctx, &types.AddActivityTaskRequest{ DomainUUID: e.executionInfo.DomainID, SourceDomainUUID: e.domainEntry.GetInfo().ID, Execution: &types.WorkflowExecution{ WorkflowID: e.executionInfo.WorkflowID, RunID: e.executionInfo.RunID, }, TaskList: &types.TaskList{Name: ai.TaskList}, ScheduleID: scheduledEvent.ID, ScheduleToStartTimeoutSeconds: common.Int32Ptr(ai.ScheduleToStartTimeout), ActivityTaskDispatchInfo: &types.ActivityTaskDispatchInfo{ ScheduledEvent: scheduledEvent, StartedTimestamp: common.Int64Ptr(e.timeSource.Now().UnixNano()), WorkflowType: e.GetWorkflowType(), WorkflowDomain: e.GetDomainEntry().GetInfo().Name, ScheduledTimestampOfThisAttempt: common.Int64Ptr(ai.ScheduledTime.UnixNano()), }, PartitionConfig: e.executionInfo.PartitionConfig, }) if err == nil { taggedScope.IncCounter(metrics.DecisionTypeScheduleActivityDispatchSucceedCounter) return true } return false } func (e *mutableStateBuilder) ReplicateActivityTaskScheduledEvent( firstEventID int64, event *types.HistoryEvent, skipTaskGeneration bool, ) (*persistence.ActivityInfo, error) { attributes := event.ActivityTaskScheduledEventAttributes targetDomainID := e.executionInfo.DomainID if attributes.GetDomain() != "" { var err error targetDomainID, err = e.shard.GetDomainCache().GetDomainID(attributes.GetDomain()) if err != nil { return nil, err } } scheduleEventID := event.ID scheduleToCloseTimeout := attributes.GetScheduleToCloseTimeoutSeconds() ai := &persistence.ActivityInfo{ Version: event.Version, ScheduleID: scheduleEventID, ScheduledEventBatchID: firstEventID, ScheduledTime: time.Unix(0, event.GetTimestamp()), StartedID: common.EmptyEventID, StartedTime: time.Time{}, ActivityID: attributes.ActivityID, DomainID: targetDomainID, ScheduleToStartTimeout: attributes.GetScheduleToStartTimeoutSeconds(), ScheduleToCloseTimeout: scheduleToCloseTimeout, StartToCloseTimeout: attributes.GetStartToCloseTimeoutSeconds(), HeartbeatTimeout: attributes.GetHeartbeatTimeoutSeconds(), CancelRequested: false, CancelRequestID: common.EmptyEventID, LastHeartBeatUpdatedTime: time.Time{}, TimerTaskStatus: TimerTaskStatusNone, TaskList: attributes.TaskList.GetName(), HasRetryPolicy: attributes.RetryPolicy != nil, } if ai.HasRetryPolicy { ai.InitialInterval = attributes.RetryPolicy.GetInitialIntervalInSeconds() ai.BackoffCoefficient = attributes.RetryPolicy.GetBackoffCoefficient() ai.MaximumInterval = attributes.RetryPolicy.GetMaximumIntervalInSeconds() ai.MaximumAttempts = attributes.RetryPolicy.GetMaximumAttempts() ai.NonRetriableErrors = attributes.RetryPolicy.NonRetriableErrorReasons if attributes.RetryPolicy.GetExpirationIntervalInSeconds() != 0 { ai.ExpirationTime = ai.ScheduledTime.Add(time.Duration(attributes.RetryPolicy.GetExpirationIntervalInSeconds()) * time.Second) } } e.pendingActivityInfoIDs[scheduleEventID] = ai e.pendingActivityIDToEventID[ai.ActivityID] = scheduleEventID e.updateActivityInfos[ai.ScheduleID] = ai if !skipTaskGeneration { return ai, e.taskGenerator.GenerateActivityTransferTasks(event) } return ai, nil } func (e *mutableStateBuilder) addTransientActivityStartedEvent( scheduleEventID int64, ) error { ai, ok := e.GetActivityInfo(scheduleEventID) if !ok || ai.StartedID != common.TransientEventID { return nil } // activity task was started (as transient event), we need to add it now. event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, ai.RequestID, ai.StartedIdentity, ai.LastFailureReason, ai.LastFailureDetails) if !ai.StartedTime.IsZero() { // overwrite started event time to the one recorded in ActivityInfo event.Timestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) } return e.ReplicateActivityTaskStartedEvent(event) } func (e *mutableStateBuilder) AddActivityTaskStartedEvent( ai *persistence.ActivityInfo, scheduleEventID int64, requestID string, identity string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskStarted if err := e.checkMutability(opTag); err != nil { return nil, err } if !ai.HasRetryPolicy { event := e.hBuilder.AddActivityTaskStartedEvent(scheduleEventID, ai.Attempt, requestID, identity, ai.LastFailureReason, ai.LastFailureDetails) if err := e.ReplicateActivityTaskStartedEvent(event); err != nil { return nil, err } return event, nil } // we might need to retry, so do not append started event just yet, // instead update mutable state and will record started event when activity task is closed ai.Version = e.GetCurrentVersion() ai.StartedID = common.TransientEventID ai.RequestID = requestID ai.StartedTime = e.timeSource.Now() ai.LastHeartBeatUpdatedTime = ai.StartedTime ai.StartedIdentity = identity if err := e.UpdateActivity(ai); err != nil { return nil, err } e.syncActivityTasks[ai.ScheduleID] = struct{}{} return nil, nil } func (e *mutableStateBuilder) ReplicateActivityTaskStartedEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskStartedEventAttributes scheduleID := attributes.GetScheduledEventID() ai, ok := e.GetActivityInfo(scheduleID) if !ok { e.logError( fmt.Sprintf("unable to find activity event id: %v in mutable state", scheduleID), tag.ErrorTypeInvalidMutableStateAction, ) return ErrMissingActivityInfo } ai.Version = event.Version ai.StartedID = event.ID ai.RequestID = attributes.GetRequestID() ai.StartedTime = time.Unix(0, event.GetTimestamp()) ai.LastHeartBeatUpdatedTime = ai.StartedTime e.updateActivityInfos[ai.ScheduleID] = ai return nil } func (e *mutableStateBuilder) AddActivityTaskCompletedEvent( scheduleEventID int64, startedEventID int64, request *types.RespondActivityTaskCompletedRequest, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskCompleted if err := e.checkMutability(opTag); err != nil { return nil, err } if ai, ok := e.GetActivityInfo(scheduleEventID); !ok || ai.StartedID != startedEventID { e.logger.Warn( mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowStartedID(startedEventID)) return nil, e.createInternalServerError(opTag) } if err := e.addTransientActivityStartedEvent(scheduleEventID); err != nil { return nil, err } event := e.hBuilder.AddActivityTaskCompletedEvent(scheduleEventID, startedEventID, request) if err := e.ReplicateActivityTaskCompletedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateActivityTaskCompletedEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskCompletedEventAttributes scheduleID := attributes.GetScheduledEventID() return e.DeleteActivity(scheduleID) } func (e *mutableStateBuilder) AddActivityTaskFailedEvent( scheduleEventID int64, startedEventID int64, request *types.RespondActivityTaskFailedRequest, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskFailed if err := e.checkMutability(opTag); err != nil { return nil, err } if ai, ok := e.GetActivityInfo(scheduleEventID); !ok || ai.StartedID != startedEventID { e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowStartedID(startedEventID)) return nil, e.createInternalServerError(opTag) } if err := e.addTransientActivityStartedEvent(scheduleEventID); err != nil { return nil, err } event := e.hBuilder.AddActivityTaskFailedEvent(scheduleEventID, startedEventID, request) if err := e.ReplicateActivityTaskFailedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateActivityTaskFailedEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskFailedEventAttributes scheduleID := attributes.GetScheduledEventID() return e.DeleteActivity(scheduleID) } func (e *mutableStateBuilder) AddActivityTaskTimedOutEvent( scheduleEventID int64, startedEventID int64, timeoutType types.TimeoutType, lastHeartBeatDetails []byte, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskTimedOut if err := e.checkMutability(opTag); err != nil { return nil, err } ai, ok := e.GetActivityInfo(scheduleEventID) if !ok || ai.StartedID != startedEventID || ((timeoutType == types.TimeoutTypeStartToClose || timeoutType == types.TimeoutTypeHeartbeat) && ai.StartedID == common.EmptyEventID) { e.logger.Warn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowScheduleID(ai.ScheduleID), tag.WorkflowStartedID(ai.StartedID), tag.WorkflowTimeoutType(int64(timeoutType))) return nil, e.createInternalServerError(opTag) } if err := e.addTransientActivityStartedEvent(scheduleEventID); err != nil { return nil, err } event := e.hBuilder.AddActivityTaskTimedOutEvent(scheduleEventID, startedEventID, timeoutType, lastHeartBeatDetails, ai.LastFailureReason, ai.LastFailureDetails) if err := e.ReplicateActivityTaskTimedOutEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateActivityTaskTimedOutEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskTimedOutEventAttributes scheduleID := attributes.GetScheduledEventID() return e.DeleteActivity(scheduleID) } func (e *mutableStateBuilder) AddActivityTaskCancelRequestedEvent( decisionCompletedEventID int64, activityID string, identity string, ) (*types.HistoryEvent, *persistence.ActivityInfo, error) { opTag := tag.WorkflowActionActivityTaskCancelRequested if err := e.checkMutability(opTag); err != nil { return nil, nil, err } // we need to add the cancel request event even if activity not in mutable state // if activity not in mutable state or already cancel requested, // we do not need to call the replication function actCancelReqEvent := e.hBuilder.AddActivityTaskCancelRequestedEvent(decisionCompletedEventID, activityID) ai, ok := e.GetActivityByActivityID(activityID) if !ok || ai.CancelRequested { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowActivityID(activityID)) return nil, nil, e.createCallerError(opTag) } if err := e.ReplicateActivityTaskCancelRequestedEvent(actCancelReqEvent); err != nil { return nil, nil, err } return actCancelReqEvent, ai, nil } func (e *mutableStateBuilder) ReplicateActivityTaskCancelRequestedEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskCancelRequestedEventAttributes activityID := attributes.GetActivityID() ai, ok := e.GetActivityByActivityID(activityID) if !ok { // On active side, if the ActivityTaskCancelRequested is invalid, it will created a RequestCancelActivityTaskFailed // Passive will rely on active side logic return nil } ai.Version = event.Version // - We have the activity dispatched to worker. // - The activity might not be heartbeating, but the activity can still call RecordActivityHeartBeat() // to see cancellation while reporting progress of the activity. ai.CancelRequested = true ai.CancelRequestID = event.ID e.updateActivityInfos[ai.ScheduleID] = ai return nil } func (e *mutableStateBuilder) AddRequestCancelActivityTaskFailedEvent( decisionCompletedEventID int64, activityID string, cause string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskCancelFailed if err := e.checkMutability(opTag); err != nil { return nil, err } return e.hBuilder.AddRequestCancelActivityTaskFailedEvent(decisionCompletedEventID, activityID, cause), nil } func (e *mutableStateBuilder) AddActivityTaskCanceledEvent( scheduleEventID int64, startedEventID int64, latestCancelRequestedEventID int64, details []byte, identity string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionActivityTaskCanceled if err := e.checkMutability(opTag); err != nil { return nil, err } ai, ok := e.GetActivityInfo(scheduleEventID) if !ok || ai.StartedID != startedEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID)) return nil, e.createInternalServerError(opTag) } // Verify cancel request as well. if !ai.CancelRequested { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowScheduleID(scheduleEventID), tag.WorkflowActivityID(ai.ActivityID), tag.WorkflowStartedID(ai.StartedID)) return nil, e.createInternalServerError(opTag) } if err := e.addTransientActivityStartedEvent(scheduleEventID); err != nil { return nil, err } event := e.hBuilder.AddActivityTaskCanceledEvent(scheduleEventID, startedEventID, latestCancelRequestedEventID, details, identity) if err := e.ReplicateActivityTaskCanceledEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateActivityTaskCanceledEvent( event *types.HistoryEvent, ) error { attributes := event.ActivityTaskCanceledEventAttributes scheduleID := attributes.GetScheduledEventID() return e.DeleteActivity(scheduleID) } func (e *mutableStateBuilder) AddCompletedWorkflowEvent( decisionCompletedEventID int64, attributes *types.CompleteWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowCompleted if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddCompletedWorkflowEvent(decisionCompletedEventID, attributes) if err := e.ReplicateWorkflowExecutionCompletedEvent(decisionCompletedEventID, event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionCompletedEvent( firstEventID int64, event *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusCompleted, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(event) return e.taskGenerator.GenerateWorkflowCloseTasks(event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddFailWorkflowEvent( decisionCompletedEventID int64, attributes *types.FailWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowFailed if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddFailWorkflowEvent(decisionCompletedEventID, attributes) if err := e.ReplicateWorkflowExecutionFailedEvent(decisionCompletedEventID, event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionFailedEvent( firstEventID int64, event *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusFailed, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(event) return e.taskGenerator.GenerateWorkflowCloseTasks(event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddTimeoutWorkflowEvent( firstEventID int64, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowTimeout if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddTimeoutWorkflowEvent() if err := e.ReplicateWorkflowExecutionTimedoutEvent(firstEventID, event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionTimedoutEvent( firstEventID int64, event *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusTimedOut, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(event) return e.taskGenerator.GenerateWorkflowCloseTasks(event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddWorkflowExecutionCancelRequestedEvent( cause string, request *types.HistoryRequestCancelWorkflowExecutionRequest, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowCancelRequested if err := e.checkMutability(opTag); err != nil { return nil, err } if e.executionInfo.CancelRequested { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowState(e.executionInfo.State), tag.Bool(e.executionInfo.CancelRequested), tag.Key(e.executionInfo.CancelRequestID), ) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddWorkflowExecutionCancelRequestedEvent(cause, request) if err := e.ReplicateWorkflowExecutionCancelRequestedEvent(event); err != nil { return nil, err } // Set the CancelRequestID on the active cluster. This information is not part of the history event. e.executionInfo.CancelRequestID = request.CancelRequest.GetRequestID() return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionCancelRequestedEvent( event *types.HistoryEvent, ) error { e.executionInfo.CancelRequested = true return nil } func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent( decisionTaskCompletedEventID int64, attributes *types.CancelWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowCanceled if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID, attributes) if err := e.ReplicateWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID, event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionCanceledEvent( firstEventID int64, event *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusCanceled, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(event) return e.taskGenerator.GenerateWorkflowCloseTasks(event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddRequestCancelExternalWorkflowExecutionInitiatedEvent( decisionCompletedEventID int64, cancelRequestID string, request *types.RequestCancelExternalWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, *persistence.RequestCancelInfo, error) { opTag := tag.WorkflowActionExternalWorkflowCancelInitiated if err := e.checkMutability(opTag); err != nil { return nil, nil, err } event := e.hBuilder.AddRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID, request) rci, err := e.ReplicateRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID, event, cancelRequestID) if err != nil { return nil, nil, err } return event, rci, nil } func (e *mutableStateBuilder) ReplicateRequestCancelExternalWorkflowExecutionInitiatedEvent( firstEventID int64, event *types.HistoryEvent, cancelRequestID string, ) (*persistence.RequestCancelInfo, error) { // TODO: Evaluate if we need cancelRequestID also part of history event initiatedEventID := event.ID rci := &persistence.RequestCancelInfo{ Version: event.Version, InitiatedEventBatchID: firstEventID, InitiatedID: initiatedEventID, CancelRequestID: cancelRequestID, } e.pendingRequestCancelInfoIDs[rci.InitiatedID] = rci e.updateRequestCancelInfos[rci.InitiatedID] = rci return rci, e.taskGenerator.GenerateRequestCancelExternalTasks(event) } func (e *mutableStateBuilder) AddExternalWorkflowExecutionCancelRequested( initiatedID int64, domain string, workflowID string, runID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionExternalWorkflowCancelRequested if err := e.checkMutability(opTag); err != nil { return nil, err } _, ok := e.GetRequestCancelInfo(initiatedID) if !ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddExternalWorkflowExecutionCancelRequested(initiatedID, domain, workflowID, runID) if err := e.ReplicateExternalWorkflowExecutionCancelRequested(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateExternalWorkflowExecutionCancelRequested( event *types.HistoryEvent, ) error { initiatedID := event.ExternalWorkflowExecutionCancelRequestedEventAttributes.GetInitiatedEventID() return e.DeletePendingRequestCancel(initiatedID) } func (e *mutableStateBuilder) AddRequestCancelExternalWorkflowExecutionFailedEvent( decisionTaskCompletedEventID int64, initiatedID int64, domain string, workflowID string, runID string, cause types.CancelExternalWorkflowExecutionFailedCause, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionExternalWorkflowCancelFailed if err := e.checkMutability(opTag); err != nil { return nil, err } _, ok := e.GetRequestCancelInfo(initiatedID) if !ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddRequestCancelExternalWorkflowExecutionFailedEvent(decisionTaskCompletedEventID, initiatedID, domain, workflowID, runID, cause) if err := e.ReplicateRequestCancelExternalWorkflowExecutionFailedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateRequestCancelExternalWorkflowExecutionFailedEvent( event *types.HistoryEvent, ) error { initiatedID := event.RequestCancelExternalWorkflowExecutionFailedEventAttributes.GetInitiatedEventID() return e.DeletePendingRequestCancel(initiatedID) } func (e *mutableStateBuilder) AddSignalExternalWorkflowExecutionInitiatedEvent( decisionCompletedEventID int64, signalRequestID string, request *types.SignalExternalWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, *persistence.SignalInfo, error) { opTag := tag.WorkflowActionExternalWorkflowSignalInitiated if err := e.checkMutability(opTag); err != nil { return nil, nil, err } event := e.hBuilder.AddSignalExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID, request) si, err := e.ReplicateSignalExternalWorkflowExecutionInitiatedEvent(decisionCompletedEventID, event, signalRequestID) if err != nil { return nil, nil, err } return event, si, nil } func (e *mutableStateBuilder) ReplicateSignalExternalWorkflowExecutionInitiatedEvent( firstEventID int64, event *types.HistoryEvent, signalRequestID string, ) (*persistence.SignalInfo, error) { // TODO: Consider also writing signalRequestID to history event initiatedEventID := event.ID attributes := event.SignalExternalWorkflowExecutionInitiatedEventAttributes si := &persistence.SignalInfo{ Version: event.Version, InitiatedEventBatchID: firstEventID, InitiatedID: initiatedEventID, SignalRequestID: signalRequestID, SignalName: attributes.GetSignalName(), Input: attributes.Input, Control: attributes.Control, } e.pendingSignalInfoIDs[si.InitiatedID] = si e.updateSignalInfos[si.InitiatedID] = si return si, e.taskGenerator.GenerateSignalExternalTasks(event) } func (e *mutableStateBuilder) AddUpsertWorkflowSearchAttributesEvent( decisionCompletedEventID int64, request *types.UpsertWorkflowSearchAttributesDecisionAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionUpsertWorkflowSearchAttributes if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddUpsertWorkflowSearchAttributesEvent(decisionCompletedEventID, request) if err := e.ReplicateUpsertWorkflowSearchAttributesEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateUpsertWorkflowSearchAttributesEvent( event *types.HistoryEvent, ) error { upsertSearchAttr := event.UpsertWorkflowSearchAttributesEventAttributes.GetSearchAttributes().GetIndexedFields() currentSearchAttr := e.GetExecutionInfo().SearchAttributes e.executionInfo.SearchAttributes = mergeMapOfByteArray(currentSearchAttr, upsertSearchAttr) return e.taskGenerator.GenerateWorkflowSearchAttrTasks() } func mergeMapOfByteArray( current map[string][]byte, upsert map[string][]byte, ) map[string][]byte { if current == nil { current = make(map[string][]byte) } for k, v := range upsert { current[k] = v } return current } func (e *mutableStateBuilder) AddExternalWorkflowExecutionSignaled( initiatedID int64, domain string, workflowID string, runID string, control []byte, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionExternalWorkflowSignalRequested if err := e.checkMutability(opTag); err != nil { return nil, err } _, ok := e.GetSignalInfo(initiatedID) if !ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddExternalWorkflowExecutionSignaled(initiatedID, domain, workflowID, runID, control) if err := e.ReplicateExternalWorkflowExecutionSignaled(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateExternalWorkflowExecutionSignaled( event *types.HistoryEvent, ) error { initiatedID := event.ExternalWorkflowExecutionSignaledEventAttributes.GetInitiatedEventID() return e.DeletePendingSignal(initiatedID) } func (e *mutableStateBuilder) AddSignalExternalWorkflowExecutionFailedEvent( decisionTaskCompletedEventID int64, initiatedID int64, domain string, workflowID string, runID string, control []byte, cause types.SignalExternalWorkflowExecutionFailedCause, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionExternalWorkflowSignalFailed if err := e.checkMutability(opTag); err != nil { return nil, err } _, ok := e.GetSignalInfo(initiatedID) if !ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddSignalExternalWorkflowExecutionFailedEvent(decisionTaskCompletedEventID, initiatedID, domain, workflowID, runID, control, cause) if err := e.ReplicateSignalExternalWorkflowExecutionFailedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateSignalExternalWorkflowExecutionFailedEvent( event *types.HistoryEvent, ) error { initiatedID := event.SignalExternalWorkflowExecutionFailedEventAttributes.GetInitiatedEventID() return e.DeletePendingSignal(initiatedID) } func (e *mutableStateBuilder) AddTimerStartedEvent( decisionCompletedEventID int64, request *types.StartTimerDecisionAttributes, ) (*types.HistoryEvent, *persistence.TimerInfo, error) { opTag := tag.WorkflowActionTimerStarted if err := e.checkMutability(opTag); err != nil { return nil, nil, err } timerID := request.GetTimerID() _, ok := e.GetUserTimerInfo(timerID) if ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowTimerID(timerID)) return nil, nil, e.createCallerError(opTag) } event := e.hBuilder.AddTimerStartedEvent(decisionCompletedEventID, request) ti, err := e.ReplicateTimerStartedEvent(event) if err != nil { return nil, nil, err } return event, ti, err } func (e *mutableStateBuilder) ReplicateTimerStartedEvent( event *types.HistoryEvent, ) (*persistence.TimerInfo, error) { attributes := event.TimerStartedEventAttributes timerID := attributes.GetTimerID() startToFireTimeout := attributes.GetStartToFireTimeoutSeconds() fireTimeout := time.Duration(startToFireTimeout) * time.Second // TODO: Time skew need to be taken in to account. expiryTime := time.Unix(0, event.GetTimestamp()).Add(fireTimeout) // should use the event time, not now ti := &persistence.TimerInfo{ Version: event.Version, TimerID: timerID, ExpiryTime: expiryTime, StartedID: event.ID, TaskStatus: TimerTaskStatusNone, } e.pendingTimerInfoIDs[ti.TimerID] = ti e.pendingTimerEventIDToID[ti.StartedID] = ti.TimerID e.updateTimerInfos[ti.TimerID] = ti return ti, nil } func (e *mutableStateBuilder) AddTimerFiredEvent( timerID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionTimerFired if err := e.checkMutability(opTag); err != nil { return nil, err } timerInfo, ok := e.GetUserTimerInfo(timerID) if !ok { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowTimerID(timerID)) return nil, e.createInternalServerError(opTag) } // Timer is running. event := e.hBuilder.AddTimerFiredEvent(timerInfo.StartedID, timerID) if err := e.ReplicateTimerFiredEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateTimerFiredEvent( event *types.HistoryEvent, ) error { attributes := event.TimerFiredEventAttributes timerID := attributes.GetTimerID() return e.DeleteUserTimer(timerID) } func (e *mutableStateBuilder) AddTimerCanceledEvent( decisionCompletedEventID int64, attributes *types.CancelTimerDecisionAttributes, identity string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionTimerCanceled if err := e.checkMutability(opTag); err != nil { return nil, err } var timerStartedID int64 timerID := attributes.GetTimerID() ti, ok := e.GetUserTimerInfo(timerID) if !ok { // if timer is not running then check if it has fired in the mutable state. // If so clear the timer from the mutable state. We need to check both the // bufferedEvents and the history builder timerFiredEvent := e.checkAndClearTimerFiredEvent(timerID) if timerFiredEvent == nil { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowTimerID(timerID)) return nil, e.createCallerError(opTag) } timerStartedID = timerFiredEvent.TimerFiredEventAttributes.GetStartedEventID() } else { timerStartedID = ti.StartedID } // Timer is running. event := e.hBuilder.AddTimerCanceledEvent(timerStartedID, decisionCompletedEventID, timerID, identity) if ok { if err := e.ReplicateTimerCanceledEvent(event); err != nil { return nil, err } } return event, nil } func (e *mutableStateBuilder) ReplicateTimerCanceledEvent( event *types.HistoryEvent, ) error { attributes := event.TimerCanceledEventAttributes timerID := attributes.GetTimerID() return e.DeleteUserTimer(timerID) } func (e *mutableStateBuilder) AddCancelTimerFailedEvent( decisionCompletedEventID int64, attributes *types.CancelTimerDecisionAttributes, identity string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionTimerCancelFailed if err := e.checkMutability(opTag); err != nil { return nil, err } // No Operation: We couldn't cancel it probably TIMER_ID_UNKNOWN timerID := attributes.GetTimerID() return e.hBuilder.AddCancelTimerFailedEvent(timerID, decisionCompletedEventID, timerCancellationMsgTimerIDUnknown, identity), nil } func (e *mutableStateBuilder) AddRecordMarkerEvent( decisionCompletedEventID int64, attributes *types.RecordMarkerDecisionAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowRecordMarker if err := e.checkMutability(opTag); err != nil { return nil, err } return e.hBuilder.AddMarkerRecordedEvent(decisionCompletedEventID, attributes), nil } func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent( firstEventID int64, reason string, details []byte, identity string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowTerminated if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddWorkflowExecutionTerminatedEvent(reason, details, identity) if err := e.ReplicateWorkflowExecutionTerminatedEvent(firstEventID, event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionTerminatedEvent( firstEventID int64, event *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusTerminated, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(event) return e.taskGenerator.GenerateWorkflowCloseTasks(event, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddWorkflowExecutionSignaled( signalName string, input []byte, identity string, requestID string, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionWorkflowSignaled if err := e.checkMutability(opTag); err != nil { return nil, err } event := e.hBuilder.AddWorkflowExecutionSignaledEvent(signalName, input, identity, requestID) if err := e.ReplicateWorkflowExecutionSignaled(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateWorkflowExecutionSignaled( event *types.HistoryEvent, ) error { // Increment signal count in mutable state for this workflow execution e.executionInfo.SignalCount++ return nil } func (e *mutableStateBuilder) AddContinueAsNewEvent( ctx context.Context, firstEventID int64, decisionCompletedEventID int64, parentDomainName string, attributes *types.ContinueAsNewWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, MutableState, error) { opTag := tag.WorkflowActionWorkflowContinueAsNew if err := e.checkMutability(opTag); err != nil { return nil, nil, err } var err error newRunID := uuid.New() newExecution := types.WorkflowExecution{ WorkflowID: e.executionInfo.WorkflowID, RunID: newRunID, } // Extract ParentExecutionInfo from current run so it can be passed down to the next var parentInfo *types.ParentExecutionInfo if e.HasParentExecution() { parentInfo = &types.ParentExecutionInfo{ DomainUUID: e.executionInfo.ParentDomainID, Domain: parentDomainName, Execution: &types.WorkflowExecution{ WorkflowID: e.executionInfo.ParentWorkflowID, RunID: e.executionInfo.ParentRunID, }, InitiatedID: e.executionInfo.InitiatedID, } } continueAsNewEvent := e.hBuilder.AddContinuedAsNewEvent(decisionCompletedEventID, newRunID, attributes) currentStartEvent, err := e.GetStartEvent(ctx) if err != nil { return nil, nil, err } firstRunID := e.executionInfo.FirstExecutionRunID // This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier // does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from // workflow execution started event. if len(firstRunID) == 0 { firstRunID = currentStartEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID() } firstScheduleTime := currentStartEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstScheduledTime() domainID := e.domainEntry.GetInfo().ID newStateBuilder := NewMutableStateBuilderWithVersionHistories( e.shard, e.logger, e.domainEntry, ).(*mutableStateBuilder) if _, err = newStateBuilder.addWorkflowExecutionStartedEventForContinueAsNew( parentInfo, newExecution, e, attributes, firstRunID, firstScheduleTime, ); err != nil { return nil, nil, &types.InternalServiceError{Message: "Failed to add workflow execution started event."} } if err = e.ReplicateWorkflowExecutionContinuedAsNewEvent( firstEventID, domainID, continueAsNewEvent, ); err != nil { return nil, nil, err } return continueAsNewEvent, newStateBuilder, nil } func rolloverAutoResetPointsWithExpiringTime( resetPoints *types.ResetPoints, prevRunID string, nowNano int64, domainRetentionDays int32, ) *types.ResetPoints { if resetPoints == nil || resetPoints.Points == nil { return resetPoints } newPoints := make([]*types.ResetPointInfo, 0, len(resetPoints.Points)) expiringTimeNano := nowNano + int64(time.Duration(domainRetentionDays)*time.Hour*24) for _, rp := range resetPoints.Points { if rp.GetRunID() == prevRunID { rp.ExpiringTimeNano = common.Int64Ptr(expiringTimeNano) } newPoints = append(newPoints, rp) } return &types.ResetPoints{ Points: newPoints, } } func (e *mutableStateBuilder) ReplicateWorkflowExecutionContinuedAsNewEvent( firstEventID int64, domainID string, continueAsNewEvent *types.HistoryEvent, ) error { if err := e.UpdateWorkflowStateCloseStatus( persistence.WorkflowStateCompleted, persistence.WorkflowCloseStatusContinuedAsNew, ); err != nil { return err } e.executionInfo.CompletionEventBatchID = firstEventID // Used when completion event needs to be loaded from database e.ClearStickyness() e.writeEventToCache(continueAsNewEvent) return e.taskGenerator.GenerateWorkflowCloseTasks(continueAsNewEvent, e.config.WorkflowDeletionJitterRange(e.domainEntry.GetInfo().Name)) } func (e *mutableStateBuilder) AddStartChildWorkflowExecutionInitiatedEvent( decisionCompletedEventID int64, createRequestID string, attributes *types.StartChildWorkflowExecutionDecisionAttributes, ) (*types.HistoryEvent, *persistence.ChildExecutionInfo, error) { opTag := tag.WorkflowActionChildWorkflowInitiated if err := e.checkMutability(opTag); err != nil { return nil, nil, err } event := e.hBuilder.AddStartChildWorkflowExecutionInitiatedEvent( decisionCompletedEventID, attributes, e.GetDomainEntry().GetInfo().Name, ) // Write the event to cache only on active cluster e.eventsCache.PutEvent(e.executionInfo.DomainID, e.executionInfo.WorkflowID, e.executionInfo.RunID, event.ID, event) ci, err := e.ReplicateStartChildWorkflowExecutionInitiatedEvent(decisionCompletedEventID, event, createRequestID) if err != nil { return nil, nil, err } return event, ci, nil } func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionInitiatedEvent( firstEventID int64, event *types.HistoryEvent, createRequestID string, ) (*persistence.ChildExecutionInfo, error) { initiatedEventID := event.ID attributes := event.StartChildWorkflowExecutionInitiatedEventAttributes domainID := e.GetExecutionInfo().DomainID if domainName := attributes.GetDomain(); domainName != "" { // domainName may still be empty if two cadence clusters are running different versions var err error domainID, err = e.shard.GetDomainCache().GetDomainID(domainName) if err != nil { return nil, err } } ci := &persistence.ChildExecutionInfo{ Version: event.Version, InitiatedID: initiatedEventID, InitiatedEventBatchID: firstEventID, StartedID: common.EmptyEventID, StartedWorkflowID: attributes.GetWorkflowID(), CreateRequestID: createRequestID, DomainID: domainID, // DomainName field is being deprecated // DomainName: attributes.GetDomain(), WorkflowTypeName: attributes.GetWorkflowType().GetName(), ParentClosePolicy: attributes.GetParentClosePolicy(), } e.pendingChildExecutionInfoIDs[ci.InitiatedID] = ci e.updateChildExecutionInfos[ci.InitiatedID] = ci return ci, e.taskGenerator.GenerateChildWorkflowTasks(event) } func (e *mutableStateBuilder) AddChildWorkflowExecutionStartedEvent( domain string, execution *types.WorkflowExecution, workflowType *types.WorkflowType, initiatedID int64, header *types.Header, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowStarted if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID != common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddChildWorkflowExecutionStartedEvent(domain, execution, workflowType, initiatedID, header) if err := e.ReplicateChildWorkflowExecutionStartedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionStartedEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionStartedEventAttributes initiatedID := attributes.GetInitiatedEventID() ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok { e.logError( "Unable to find child workflow", tag.ErrorTypeInvalidMutableStateAction, tag.WorkflowEventID(e.GetNextEventID()), tag.WorkflowInitiatedID(initiatedID), ) return ErrMissingChildWorkflowInfo } ci.StartedID = event.ID ci.StartedRunID = attributes.GetWorkflowExecution().GetRunID() e.updateChildExecutionInfos[ci.InitiatedID] = ci return nil } func (e *mutableStateBuilder) AddStartChildWorkflowExecutionFailedEvent( initiatedID int64, cause types.ChildWorkflowExecutionFailedCause, initiatedEventAttributes *types.StartChildWorkflowExecutionInitiatedEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowInitiationFailed if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID != common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } event := e.hBuilder.AddStartChildWorkflowExecutionFailedEvent(initiatedID, cause, initiatedEventAttributes) if err := e.ReplicateStartChildWorkflowExecutionFailedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateStartChildWorkflowExecutionFailedEvent( event *types.HistoryEvent, ) error { attributes := event.StartChildWorkflowExecutionFailedEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) AddChildWorkflowExecutionCompletedEvent( initiatedID int64, childExecution *types.WorkflowExecution, attributes *types.WorkflowExecutionCompletedEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowCompleted if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID == common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } childDomainName, err := GetChildExecutionDomainName(ci, e.shard.GetDomainCache(), e.GetDomainEntry()) if err != nil { return nil, err } workflowType := &types.WorkflowType{ Name: ci.WorkflowTypeName, } event := e.hBuilder.AddChildWorkflowExecutionCompletedEvent( childDomainName, childExecution, workflowType, ci.InitiatedID, ci.StartedID, attributes, ) if err := e.ReplicateChildWorkflowExecutionCompletedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionCompletedEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionCompletedEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) AddChildWorkflowExecutionFailedEvent( initiatedID int64, childExecution *types.WorkflowExecution, attributes *types.WorkflowExecutionFailedEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowFailed if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID == common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(!ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } domainName, err := GetChildExecutionDomainName(ci, e.shard.GetDomainCache(), e.GetDomainEntry()) if err != nil { return nil, err } workflowType := &types.WorkflowType{ Name: ci.WorkflowTypeName, } event := e.hBuilder.AddChildWorkflowExecutionFailedEvent( domainName, childExecution, workflowType, ci.InitiatedID, ci.StartedID, attributes, ) if err := e.ReplicateChildWorkflowExecutionFailedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionFailedEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionFailedEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) AddChildWorkflowExecutionCanceledEvent( initiatedID int64, childExecution *types.WorkflowExecution, attributes *types.WorkflowExecutionCanceledEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowCanceled if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID == common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } domainName, err := GetChildExecutionDomainName(ci, e.shard.GetDomainCache(), e.GetDomainEntry()) if err != nil { return nil, err } workflowType := &types.WorkflowType{ Name: ci.WorkflowTypeName, } event := e.hBuilder.AddChildWorkflowExecutionCanceledEvent( domainName, childExecution, workflowType, ci.InitiatedID, ci.StartedID, attributes, ) if err := e.ReplicateChildWorkflowExecutionCanceledEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionCanceledEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionCanceledEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) AddChildWorkflowExecutionTerminatedEvent( initiatedID int64, childExecution *types.WorkflowExecution, attributes *types.WorkflowExecutionTerminatedEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowTerminated if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID == common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } domainName, err := GetChildExecutionDomainName(ci, e.shard.GetDomainCache(), e.GetDomainEntry()) if err != nil { return nil, err } workflowType := &types.WorkflowType{ Name: ci.WorkflowTypeName, } event := e.hBuilder.AddChildWorkflowExecutionTerminatedEvent( domainName, childExecution, workflowType, ci.InitiatedID, ci.StartedID, attributes, ) if err := e.ReplicateChildWorkflowExecutionTerminatedEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionTerminatedEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionTerminatedEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) AddChildWorkflowExecutionTimedOutEvent( initiatedID int64, childExecution *types.WorkflowExecution, attributes *types.WorkflowExecutionTimedOutEventAttributes, ) (*types.HistoryEvent, error) { opTag := tag.WorkflowActionChildWorkflowTimedOut if err := e.checkMutability(opTag); err != nil { return nil, err } ci, ok := e.GetChildExecutionInfo(initiatedID) if !ok || ci.StartedID == common.EmptyEventID { e.logWarn(mutableStateInvalidHistoryActionMsg, opTag, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.Bool(ok), tag.WorkflowInitiatedID(initiatedID)) return nil, e.createInternalServerError(opTag) } domainName, err := GetChildExecutionDomainName(ci, e.shard.GetDomainCache(), e.GetDomainEntry()) if err != nil { return nil, err } workflowType := &types.WorkflowType{ Name: ci.WorkflowTypeName, } event := e.hBuilder.AddChildWorkflowExecutionTimedOutEvent( domainName, childExecution, workflowType, ci.InitiatedID, ci.StartedID, attributes, ) if err := e.ReplicateChildWorkflowExecutionTimedOutEvent(event); err != nil { return nil, err } return event, nil } func (e *mutableStateBuilder) ReplicateChildWorkflowExecutionTimedOutEvent( event *types.HistoryEvent, ) error { attributes := event.ChildWorkflowExecutionTimedOutEventAttributes initiatedID := attributes.GetInitiatedEventID() return e.DeletePendingChildExecution(initiatedID) } func (e *mutableStateBuilder) RetryActivity( ai *persistence.ActivityInfo, failureReason string, failureDetails []byte, ) (bool, error) { opTag := tag.WorkflowActionActivityTaskRetry if err := e.checkMutability(opTag); err != nil { return false, err } if !ai.HasRetryPolicy || ai.CancelRequested { return false, nil } now := e.timeSource.Now() backoffInterval := getBackoffInterval( now, ai.ExpirationTime, ai.Attempt, ai.MaximumAttempts, ai.InitialInterval, ai.MaximumInterval, ai.BackoffCoefficient, failureReason, ai.NonRetriableErrors, ) if backoffInterval == backoff.NoBackoff { return false, nil } // a retry is needed, update activity info for next retry ai.Version = e.GetCurrentVersion() ai.Attempt++ ai.ScheduledTime = now.Add(backoffInterval) // update to next schedule time ai.StartedID = common.EmptyEventID ai.RequestID = "" ai.StartedTime = time.Time{} ai.TimerTaskStatus = TimerTaskStatusNone ai.LastFailureReason = failureReason ai.LastWorkerIdentity = ai.StartedIdentity ai.LastFailureDetails = failureDetails if err := e.taskGenerator.GenerateActivityRetryTasks( ai.ScheduleID, ); err != nil { return false, err } e.updateActivityInfos[ai.ScheduleID] = ai e.syncActivityTasks[ai.ScheduleID] = struct{}{} return true, nil } // TODO mutable state should generate corresponding transfer / timer tasks according to // updates accumulated, while currently all transfer / timer tasks are managed manually // TODO convert AddTransferTasks to prepareTransferTasks func (e *mutableStateBuilder) AddTransferTasks( transferTasks ...persistence.Task, ) { e.insertTransferTasks = append(e.insertTransferTasks, transferTasks...) } func (e *mutableStateBuilder) AddCrossClusterTasks( crossClusterTasks ...persistence.Task, ) { e.insertCrossClusterTasks = append(e.insertCrossClusterTasks, crossClusterTasks...) } // TODO convert AddTimerTasks to prepareTimerTasks func (e *mutableStateBuilder) AddTimerTasks( timerTasks ...persistence.Task, ) { e.insertTimerTasks = append(e.insertTimerTasks, timerTasks...) } func (e *mutableStateBuilder) GetTransferTasks() []persistence.Task { return e.insertTransferTasks } func (e *mutableStateBuilder) GetCrossClusterTasks() []persistence.Task { return e.insertCrossClusterTasks } func (e *mutableStateBuilder) GetTimerTasks() []persistence.Task { return e.insertTimerTasks } func (e *mutableStateBuilder) DeleteTransferTasks() { e.insertTransferTasks = nil } func (e *mutableStateBuilder) DeleteCrossClusterTasks() { e.insertCrossClusterTasks = nil } func (e *mutableStateBuilder) DeleteTimerTasks() { e.insertTimerTasks = nil } func (e *mutableStateBuilder) SetUpdateCondition( nextEventIDInDB int64, ) { e.nextEventIDInDB = nextEventIDInDB } func (e *mutableStateBuilder) GetUpdateCondition() int64 { return e.nextEventIDInDB } func (e *mutableStateBuilder) GetWorkflowStateCloseStatus() (int, int) { executionInfo := e.executionInfo return executionInfo.State, executionInfo.CloseStatus } func (e *mutableStateBuilder) UpdateWorkflowStateCloseStatus( state int, closeStatus int, ) error { return e.executionInfo.UpdateWorkflowStateCloseStatus(state, closeStatus) } func (e *mutableStateBuilder) StartTransaction( domainEntry *cache.DomainCacheEntry, incomingTaskVersion int64, ) (bool, error) { e.domainEntry = domainEntry if err := e.UpdateCurrentVersion(domainEntry.GetFailoverVersion(), false); err != nil { return false, err } flushBeforeReady, err := e.startTransactionHandleDecisionFailover(incomingTaskVersion) if err != nil { return false, err } return flushBeforeReady, nil } func (e *mutableStateBuilder) CloseTransactionAsMutation( now time.Time, transactionPolicy TransactionPolicy, ) (*persistence.WorkflowMutation, []*persistence.WorkflowEvents, error) { if err := e.prepareCloseTransaction( transactionPolicy, ); err != nil { return nil, nil, err } workflowEventsSeq, err := e.prepareEventsAndReplicationTasks(transactionPolicy) if err != nil { return nil, nil, err } if len(workflowEventsSeq) > 0 { lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events firstEvent := lastEvents[0] lastEvent := lastEvents[len(lastEvents)-1] e.updateWithLastFirstEvent(firstEvent) if err := e.updateWithLastWriteEvent( lastEvent, transactionPolicy, ); err != nil { return nil, nil, err } } // update last update time e.executionInfo.LastUpdatedTimestamp = now // we generate checksum here based on the assumption that the returned // snapshot object is considered immutable. As of this writing, the only // code that modifies the returned object lives inside workflowExecutionContext.resetWorkflowExecution // currently, the updates done inside workflowExecutionContext.resetWorkflowExecution doesn't // impact the checksum calculation checksum := e.generateChecksum() workflowMutation := &persistence.WorkflowMutation{ ExecutionInfo: e.executionInfo, VersionHistories: e.versionHistories, UpsertActivityInfos: convertUpdateActivityInfos(e.updateActivityInfos), DeleteActivityInfos: convertInt64SetToSlice(e.deleteActivityInfos), UpsertTimerInfos: convertUpdateTimerInfos(e.updateTimerInfos), DeleteTimerInfos: convertStringSetToSlice(e.deleteTimerInfos), UpsertChildExecutionInfos: convertUpdateChildExecutionInfos(e.updateChildExecutionInfos), DeleteChildExecutionInfos: convertInt64SetToSlice(e.deleteChildExecutionInfos), UpsertRequestCancelInfos: convertUpdateRequestCancelInfos(e.updateRequestCancelInfos), DeleteRequestCancelInfos: convertInt64SetToSlice(e.deleteRequestCancelInfos), UpsertSignalInfos: convertUpdateSignalInfos(e.updateSignalInfos), DeleteSignalInfos: convertInt64SetToSlice(e.deleteSignalInfos), UpsertSignalRequestedIDs: convertStringSetToSlice(e.updateSignalRequestedIDs), DeleteSignalRequestedIDs: convertStringSetToSlice(e.deleteSignalRequestedIDs), NewBufferedEvents: e.updateBufferedEvents, ClearBufferedEvents: e.clearBufferedEvents, TransferTasks: e.insertTransferTasks, CrossClusterTasks: e.insertCrossClusterTasks, ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, Condition: e.nextEventIDInDB, Checksum: checksum, } e.checksum = checksum if err := e.cleanupTransaction(); err != nil { return nil, nil, err } return workflowMutation, workflowEventsSeq, nil } func (e *mutableStateBuilder) CloseTransactionAsSnapshot( now time.Time, transactionPolicy TransactionPolicy, ) (*persistence.WorkflowSnapshot, []*persistence.WorkflowEvents, error) { if err := e.prepareCloseTransaction( transactionPolicy, ); err != nil { return nil, nil, err } workflowEventsSeq, err := e.prepareEventsAndReplicationTasks(transactionPolicy) if err != nil { return nil, nil, err } if len(workflowEventsSeq) > 1 { return nil, nil, &types.InternalServiceError{ Message: "cannot generate workflow snapshot with transient events", } } if len(e.bufferedEvents) > 0 { // TODO do we need the functionality to generate snapshot with buffered events? return nil, nil, &types.InternalServiceError{ Message: "cannot generate workflow snapshot with buffered events", } } if len(workflowEventsSeq) > 0 { lastEvents := workflowEventsSeq[len(workflowEventsSeq)-1].Events firstEvent := lastEvents[0] lastEvent := lastEvents[len(lastEvents)-1] e.updateWithLastFirstEvent(firstEvent) if err := e.updateWithLastWriteEvent( lastEvent, transactionPolicy, ); err != nil { return nil, nil, err } } // update last update time e.executionInfo.LastUpdatedTimestamp = now // we generate checksum here based on the assumption that the returned // snapshot object is considered immutable. As of this writing, the only // code that modifies the returned object lives inside workflowExecutionContext.resetWorkflowExecution // currently, the updates done inside workflowExecutionContext.resetWorkflowExecution doesn't // impact the checksum calculation checksum := e.generateChecksum() workflowSnapshot := &persistence.WorkflowSnapshot{ ExecutionInfo: e.executionInfo, VersionHistories: e.versionHistories, ActivityInfos: convertPendingActivityInfos(e.pendingActivityInfoIDs), TimerInfos: convertPendingTimerInfos(e.pendingTimerInfoIDs), ChildExecutionInfos: convertPendingChildExecutionInfos(e.pendingChildExecutionInfoIDs), RequestCancelInfos: convertPendingRequestCancelInfos(e.pendingRequestCancelInfoIDs), SignalInfos: convertPendingSignalInfos(e.pendingSignalInfoIDs), SignalRequestedIDs: convertStringSetToSlice(e.pendingSignalRequestedIDs), TransferTasks: e.insertTransferTasks, CrossClusterTasks: e.insertCrossClusterTasks, ReplicationTasks: e.insertReplicationTasks, TimerTasks: e.insertTimerTasks, Condition: e.nextEventIDInDB, Checksum: checksum, } e.checksum = checksum if err := e.cleanupTransaction(); err != nil { return nil, nil, err } return workflowSnapshot, workflowEventsSeq, nil } func (e *mutableStateBuilder) IsResourceDuplicated( resourceDedupKey definition.DeduplicationID, ) bool { id := definition.GenerateDeduplicationKey(resourceDedupKey) _, duplicated := e.appliedEvents[id] return duplicated } func (e *mutableStateBuilder) UpdateDuplicatedResource( resourceDedupKey definition.DeduplicationID, ) { id := definition.GenerateDeduplicationKey(resourceDedupKey) e.appliedEvents[id] = struct{}{} } func (e *mutableStateBuilder) GetHistorySize() int64 { return e.executionStats.HistorySize } func (e *mutableStateBuilder) SetHistorySize(size int64) { e.executionStats.HistorySize = size } func (e *mutableStateBuilder) prepareCloseTransaction( transactionPolicy TransactionPolicy, ) error { if err := e.closeTransactionWithPolicyCheck( transactionPolicy, ); err != nil { return err } if err := e.closeTransactionHandleBufferedEventsLimit( transactionPolicy, ); err != nil { return err } if err := e.closeTransactionHandleWorkflowReset( transactionPolicy, ); err != nil { return err } // flushing buffered events should happen at very last if transactionPolicy == TransactionPolicyActive { if err := e.FlushBufferedEvents(); err != nil { return err } } // NOTE: this function must be the last call // since we only generate at most one activity & user timer, // regardless of how many activity & user timer created // so the calculation must be at the very end return e.closeTransactionHandleActivityUserTimerTasks() } func (e *mutableStateBuilder) cleanupTransaction() error { // Clear all updates to prepare for the next session e.hBuilder = NewHistoryBuilder(e) e.updateActivityInfos = make(map[int64]*persistence.ActivityInfo) e.deleteActivityInfos = make(map[int64]struct{}) e.syncActivityTasks = make(map[int64]struct{}) e.updateTimerInfos = make(map[string]*persistence.TimerInfo) e.deleteTimerInfos = make(map[string]struct{}) e.updateChildExecutionInfos = make(map[int64]*persistence.ChildExecutionInfo) e.deleteChildExecutionInfos = make(map[int64]struct{}) e.updateRequestCancelInfos = make(map[int64]*persistence.RequestCancelInfo) e.deleteRequestCancelInfos = make(map[int64]struct{}) e.updateSignalInfos = make(map[int64]*persistence.SignalInfo) e.deleteSignalInfos = make(map[int64]struct{}) e.updateSignalRequestedIDs = make(map[string]struct{}) e.deleteSignalRequestedIDs = make(map[string]struct{}) e.clearBufferedEvents = false if e.updateBufferedEvents != nil { e.bufferedEvents = append(e.bufferedEvents, e.updateBufferedEvents...) e.updateBufferedEvents = nil } e.hasBufferedEventsInDB = len(e.bufferedEvents) > 0 e.stateInDB = e.executionInfo.State e.nextEventIDInDB = e.GetNextEventID() e.insertTransferTasks = nil e.insertCrossClusterTasks = nil e.insertReplicationTasks = nil e.insertTimerTasks = nil return nil } func (e *mutableStateBuilder) prepareEventsAndReplicationTasks( transactionPolicy TransactionPolicy, ) ([]*persistence.WorkflowEvents, error) { currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } var workflowEventsSeq []*persistence.WorkflowEvents if len(e.hBuilder.transientHistory) != 0 { workflowEventsSeq = append(workflowEventsSeq, &persistence.WorkflowEvents{ DomainID: e.executionInfo.DomainID, WorkflowID: e.executionInfo.WorkflowID, RunID: e.executionInfo.RunID, BranchToken: currentBranchToken, Events: e.hBuilder.transientHistory, }) } if len(e.hBuilder.history) != 0 { workflowEventsSeq = append(workflowEventsSeq, &persistence.WorkflowEvents{ DomainID: e.executionInfo.DomainID, WorkflowID: e.executionInfo.WorkflowID, RunID: e.executionInfo.RunID, BranchToken: currentBranchToken, Events: e.hBuilder.history, }) } if err := e.validateNoEventsAfterWorkflowFinish( transactionPolicy, e.hBuilder.history, ); err != nil { return nil, err } for _, workflowEvents := range workflowEventsSeq { replicationTasks, err := e.eventsToReplicationTask(transactionPolicy, workflowEvents.Events) if err != nil { return nil, err } e.insertReplicationTasks = append( e.insertReplicationTasks, replicationTasks..., ) } e.insertReplicationTasks = append( e.insertReplicationTasks, e.syncActivityToReplicationTask(transactionPolicy)..., ) if transactionPolicy == TransactionPolicyPassive && len(e.insertReplicationTasks) > 0 { return nil, &types.InternalServiceError{ Message: "should not generate replication task when close transaction as passive", } } return workflowEventsSeq, nil } func (e *mutableStateBuilder) eventsToReplicationTask( transactionPolicy TransactionPolicy, events []*types.HistoryEvent, ) ([]persistence.Task, error) { if transactionPolicy == TransactionPolicyPassive || !e.canReplicateEvents() || len(events) == 0 { return emptyTasks, nil } firstEvent := events[0] lastEvent := events[len(events)-1] version := firstEvent.Version sourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(version) if err != nil { return nil, err } currentCluster := e.clusterMetadata.GetCurrentClusterName() if currentCluster != sourceCluster { return nil, &types.InternalServiceError{ Message: "mutableStateBuilder encounter contradicting version & transaction policy", } } currentBranchToken, err := e.GetCurrentBranchToken() if err != nil { return nil, err } // the visibility timestamp will be set in shard context replicationTask := &persistence.HistoryReplicationTask{ TaskData: persistence.TaskData{ Version: firstEvent.Version, }, FirstEventID: firstEvent.ID, NextEventID: lastEvent.ID + 1, BranchToken: currentBranchToken, NewRunBranchToken: nil, } return []persistence.Task{replicationTask}, nil } func (e *mutableStateBuilder) syncActivityToReplicationTask( transactionPolicy TransactionPolicy, ) []persistence.Task { if transactionPolicy == TransactionPolicyPassive || !e.canReplicateEvents() { return emptyTasks } return convertSyncActivityInfos( e.pendingActivityInfoIDs, e.syncActivityTasks, ) } func (e *mutableStateBuilder) updateWithLastWriteEvent( lastEvent *types.HistoryEvent, transactionPolicy TransactionPolicy, ) error { if transactionPolicy == TransactionPolicyPassive { // already handled in state builder return nil } e.GetExecutionInfo().LastEventTaskID = lastEvent.TaskID if e.versionHistories != nil { currentVersionHistory, err := e.versionHistories.GetCurrentVersionHistory() if err != nil { return err } if err := currentVersionHistory.AddOrUpdateItem(persistence.NewVersionHistoryItem( lastEvent.ID, lastEvent.Version, )); err != nil { return err } } return nil } func (e *mutableStateBuilder) updateWithLastFirstEvent( lastFirstEvent *types.HistoryEvent, ) { e.GetExecutionInfo().SetLastFirstEventID(lastFirstEvent.ID) } func (e *mutableStateBuilder) canReplicateEvents() bool { if e.domainEntry.GetReplicationPolicy() == cache.ReplicationPolicyOneCluster { return false } // ReplicationPolicyMultiCluster domainID := e.domainEntry.GetInfo().ID workflowID := e.GetExecutionInfo().WorkflowID return e.shard.GetConfig().EnableReplicationTaskGeneration(domainID, workflowID) } // validateNoEventsAfterWorkflowFinish perform check on history event batch // NOTE: do not apply this check on every batch, since transient // decision && workflow finish will be broken (the first batch) func (e *mutableStateBuilder) validateNoEventsAfterWorkflowFinish( transactionPolicy TransactionPolicy, events []*types.HistoryEvent, ) error { if transactionPolicy == TransactionPolicyPassive || len(events) == 0 { return nil } // only do check if workflow is finished if e.GetExecutionInfo().State != persistence.WorkflowStateCompleted { return nil } // workflow close // this will perform check on the last event of last batch // NOTE: do not apply this check on every batch, since transient // decision && workflow finish will be broken (the first batch) lastEvent := events[len(events)-1] switch lastEvent.GetEventType() { case types.EventTypeWorkflowExecutionCompleted, types.EventTypeWorkflowExecutionFailed, types.EventTypeWorkflowExecutionTimedOut, types.EventTypeWorkflowExecutionTerminated, types.EventTypeWorkflowExecutionContinuedAsNew, types.EventTypeWorkflowExecutionCanceled: return nil default: executionInfo := e.GetExecutionInfo() e.logError( "encounter case where events appears after workflow finish.", tag.WorkflowDomainID(executionInfo.DomainID), tag.WorkflowID(executionInfo.WorkflowID), tag.WorkflowRunID(executionInfo.RunID), ) return ErrEventsAfterWorkflowFinish } } func (e *mutableStateBuilder) startTransactionHandleDecisionFailover( incomingTaskVersion int64, ) (bool, error) { if !e.IsWorkflowExecutionRunning() || !e.canReplicateEvents() { return false, nil } // NOTE: // the main idea here is to guarantee that once there is a decision task started // all events ending in the buffer should have the same version // Handling mutable state turn from standby to active, while having a decision on the fly decision, ok := e.GetInFlightDecision() if !ok || decision.Version >= e.GetCurrentVersion() { // no pending decision, no buffered events // or decision has higher / equal version return false, nil } currentVersion := e.GetCurrentVersion() lastWriteVersion, err := e.GetLastWriteVersion() if err != nil { return false, err } if lastWriteVersion != decision.Version { return false, &types.InternalServiceError{Message: fmt.Sprintf( "mutableStateBuilder encounter mismatch version, decision: %v, last write version %v", decision.Version, lastWriteVersion, )} } lastWriteSourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(lastWriteVersion) if err != nil { return false, err } currentVersionCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(currentVersion) if err != nil { return false, err } currentCluster := e.clusterMetadata.GetCurrentClusterName() // there are 4 cases for version changes (based on version from domain cache) // NOTE: domain cache version change may occur after seeing events with higher version // meaning that the flush buffer logic in NDC branch manager should be kept. // // 1. active -> passive => fail decision & flush buffer using last write version // 2. active -> active => fail decision & flush buffer using last write version // 3. passive -> active => fail decision using current version, no buffered events // 4. passive -> passive => no buffered events, since always passive, nothing to be done // 5. special case: current cluster is passive. Due to some reason, the history generated by the current cluster // is missing and the missing history replicate back from remote cluster via resending approach => nothing to do // handle case 5 incomingTaskSourceCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(incomingTaskVersion) if err != nil { return false, err } if incomingTaskVersion != common.EmptyVersion && currentVersionCluster != currentCluster && incomingTaskSourceCluster == currentCluster { return false, nil } // handle case 4 if lastWriteSourceCluster != currentCluster && currentVersionCluster != currentCluster { // do a sanity check on buffered events if e.HasBufferedEvents() { return false, &types.InternalServiceError{ Message: "mutableStateBuilder encounter previous passive workflow with buffered events", } } return false, nil } // handle case 1 & 2 var flushBufferVersion = lastWriteVersion // handle case 3 if lastWriteSourceCluster != currentCluster && currentVersionCluster == currentCluster { // do a sanity check on buffered events if e.HasBufferedEvents() { return false, &types.InternalServiceError{ Message: "mutableStateBuilder encounter previous passive workflow with buffered events", } } flushBufferVersion = currentVersion } // this workflow was previous active (whether it has buffered events or not), // the in flight decision must be failed to guarantee all events within same // event batch shard the same version if err := e.UpdateCurrentVersion(flushBufferVersion, true); err != nil { return false, err } // we have a decision with buffered events on the fly with a lower version, fail it if err := FailDecision( e, decision, types.DecisionTaskFailedCauseFailoverCloseDecision, ); err != nil { return false, err } err = ScheduleDecision(e) if err != nil { return false, err } return true, nil } func (e *mutableStateBuilder) closeTransactionWithPolicyCheck( transactionPolicy TransactionPolicy, ) error { if transactionPolicy == TransactionPolicyPassive || !e.canReplicateEvents() { return nil } activeCluster, err := e.clusterMetadata.ClusterNameForFailoverVersion(e.GetCurrentVersion()) if err != nil { return err } currentCluster := e.clusterMetadata.GetCurrentClusterName() if activeCluster != currentCluster { domainID := e.GetExecutionInfo().DomainID return errors.NewDomainNotActiveError(domainID, currentCluster, activeCluster) } return nil } func (e *mutableStateBuilder) closeTransactionHandleBufferedEventsLimit( transactionPolicy TransactionPolicy, ) error { if transactionPolicy == TransactionPolicyPassive || !e.IsWorkflowExecutionRunning() { return nil } if len(e.bufferedEvents) < e.config.MaximumBufferedEventsBatch() { return nil } // Handling buffered events size issue if decision, ok := e.GetInFlightDecision(); ok { // we have a decision on the fly with a lower version, fail it if err := FailDecision( e, decision, types.DecisionTaskFailedCauseForceCloseDecision, ); err != nil { return err } err := ScheduleDecision(e) if err != nil { return err } } return nil } func (e *mutableStateBuilder) closeTransactionHandleWorkflowReset( transactionPolicy TransactionPolicy, ) error { if transactionPolicy == TransactionPolicyPassive || !e.IsWorkflowExecutionRunning() { return nil } // compare with bad client binary checksum and schedule a reset task // only schedule reset task if current doesn't have childWFs. // TODO: This will be removed once our reset allows childWFs if len(e.GetPendingChildExecutionInfos()) != 0 { return nil } executionInfo := e.GetExecutionInfo() domainEntry, err := e.shard.GetDomainCache().GetDomainByID(executionInfo.DomainID) if err != nil { return err } if _, pt := FindAutoResetPoint( e.timeSource, &domainEntry.GetConfig().BadBinaries, e.GetExecutionInfo().AutoResetPoints, ); pt != nil { if err := e.taskGenerator.GenerateWorkflowResetTasks(); err != nil { return err } e.logInfo("Auto-Reset task is scheduled", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(executionInfo.WorkflowID), tag.WorkflowRunID(executionInfo.RunID), tag.WorkflowResetBaseRunID(pt.GetRunID()), tag.WorkflowEventID(pt.GetFirstDecisionCompletedID()), tag.WorkflowBinaryChecksum(pt.GetBinaryChecksum()), ) } return nil } func (e *mutableStateBuilder) closeTransactionHandleActivityUserTimerTasks() error { if !e.IsWorkflowExecutionRunning() { return nil } if err := e.taskGenerator.GenerateActivityTimerTasks(); err != nil { return err } return e.taskGenerator.GenerateUserTimerTasks() } func (e *mutableStateBuilder) checkMutability( actionTag tag.Tag, ) error { if !e.IsWorkflowExecutionRunning() { e.logWarn( mutableStateInvalidHistoryActionMsg, tag.WorkflowEventID(e.GetNextEventID()), tag.ErrorTypeInvalidHistoryAction, tag.WorkflowState(e.executionInfo.State), actionTag, ) return ErrWorkflowFinished } return nil } func (e *mutableStateBuilder) generateChecksum() checksum.Checksum { if !e.shouldGenerateChecksum() { return checksum.Checksum{} } csum, err := generateMutableStateChecksum(e) if err != nil { e.logWarn("error generating mutableState checksum", tag.Error(err)) return checksum.Checksum{} } return csum } func (e *mutableStateBuilder) shouldGenerateChecksum() bool { if e.domainEntry == nil { return false } return rand.Intn(100) < e.config.MutableStateChecksumGenProbability(e.domainEntry.GetInfo().Name) } func (e *mutableStateBuilder) shouldVerifyChecksum() bool { if e.domainEntry == nil { return false } return rand.Intn(100) < e.config.MutableStateChecksumVerifyProbability(e.domainEntry.GetInfo().Name) } func (e *mutableStateBuilder) enableChecksumFailureRetry() bool { if e.domainEntry == nil { return false } return e.config.EnableRetryForChecksumFailure(e.domainEntry.GetInfo().Name) } func (e *mutableStateBuilder) shouldInvalidateChecksum() bool { invalidateBeforeEpochSecs := int64(e.config.MutableStateChecksumInvalidateBefore()) if invalidateBeforeEpochSecs > 0 { invalidateBefore := time.Unix(invalidateBeforeEpochSecs, 0) return e.executionInfo.LastUpdatedTimestamp.Before(invalidateBefore) } return false } func (e *mutableStateBuilder) createInternalServerError( actionTag tag.Tag, ) error { return &types.InternalServiceError{Message: actionTag.Field().String + " operation failed"} } func (e *mutableStateBuilder) createCallerError( actionTag tag.Tag, ) error { return &types.BadRequestError{ Message: fmt.Sprintf(mutableStateInvalidHistoryActionMsgTemplate, actionTag.Field().String), } } func (e *mutableStateBuilder) unixNanoToTime( timestampNanos int64, ) time.Time { return time.Unix(0, timestampNanos) } func (e *mutableStateBuilder) logInfo(msg string, tags ...tag.Tag) { if e != nil { return } if e.executionInfo != nil { tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID)) tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID)) tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID)) } e.logger.Info(msg, tags...) } func (e *mutableStateBuilder) logWarn(msg string, tags ...tag.Tag) { if e != nil { return } if e.executionInfo != nil { tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID)) tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID)) tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID)) } e.logger.Warn(msg, tags...) } func (e *mutableStateBuilder) logError(msg string, tags ...tag.Tag) { if e != nil { return } if e.executionInfo != nil { tags = append(tags, tag.WorkflowID(e.executionInfo.WorkflowID)) tags = append(tags, tag.WorkflowRunID(e.executionInfo.RunID)) tags = append(tags, tag.WorkflowDomainID(e.executionInfo.DomainID)) } e.logger.Error(msg, tags...) } func (e *mutableStateBuilder) logDataInconsistency() { domainID := e.executionInfo.DomainID workflowID := e.executionInfo.WorkflowID runID := e.executionInfo.RunID e.metricsClient.Scope(metrics.WorkflowContextScope).IncCounter(metrics.DataInconsistentCounter) e.logger.Error("encounter mutable state data inconsistency", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), ) }