in service/history/execution/context.go [744:914]
func (c *contextImpl) UpdateWorkflowExecutionWithNew(
ctx context.Context,
now time.Time,
updateMode persistence.UpdateWorkflowMode,
newContext Context,
newMutableState MutableState,
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
currentWorkflow, currentWorkflowEventsSeq, err := c.mutableState.CloseTransactionAsMutation(
now,
currentWorkflowTransactionPolicy,
)
if err != nil {
return err
}
var persistedBlobs events.PersistedBlobs
currentWorkflowSize := c.GetHistorySize()
oldWorkflowSize := currentWorkflowSize
currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1
oldWorkflowHistoryCount := currentWorkflowHistoryCount
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
if err != nil {
return err
}
currentWorkflowSize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
c.SetHistorySize(currentWorkflowSize)
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: currentWorkflowSize,
}
var newWorkflow *persistence.WorkflowSnapshot
var newWorkflowEventsSeq []*persistence.WorkflowEvents
if newContext != nil && newMutableState != nil && newWorkflowTransactionPolicy != nil {
defer func() {
if retError != nil {
newContext.Clear()
}
}()
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
now,
*newWorkflowTransactionPolicy,
)
if err != nil {
return err
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
firstEventID := startEvents.Events[0].ID
var blob events.PersistedBlob
if firstEventID == common.FirstEventID {
blob, err = c.PersistStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
} else {
// NOTE: This is the case for reset workflow, reset workflow already inserted a branch record
blob, err = c.PersistNonStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
}
persistedBlobs = append(persistedBlobs, blob)
newWorkflowSizeSize += int64(len(blob.Data))
newContext.SetHistorySize(newWorkflowSizeSize)
newWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: newWorkflowSizeSize,
}
}
if err := mergeContinueAsNewReplicationTasks(
updateMode,
currentWorkflow,
newWorkflow,
); err != nil {
return err
}
if err := c.updateWorkflowExecutionEventReapply(
updateMode,
currentWorkflowEventsSeq,
newWorkflowEventsSeq,
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionWithRetry(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: updateMode,
UpdateWorkflowMutation: *currentWorkflow,
NewWorkflowSnapshot: newWorkflow,
// Encoding, this is set by shard context
DomainName: domain,
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
}
return err
}
// TODO remove updateCondition in favor of condition in mutable state
c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID
// for any change in the workflow, send a event
currentBranchToken, err := c.mutableState.GetCurrentBranchToken()
if err != nil {
return err
}
workflowState, workflowCloseState := c.mutableState.GetWorkflowStateCloseStatus()
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
c.mutableState.GetLastFirstEventID(),
c.mutableState.GetNextEventID(),
c.mutableState.GetPreviousStartedEventID(),
currentBranchToken,
workflowState,
workflowCloseState,
))
// notify current workflow tasks
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)
// notify new workflow tasks
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
// finally emit session stats
domainName := c.GetDomainName()
emitWorkflowHistoryStats(
c.metricsClient,
domainName,
int(c.stats.HistorySize),
int(c.mutableState.GetNextEventID()-1),
)
emitSessionUpdateStats(
c.metricsClient,
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount)
// emit workflow completion stats if any
if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
workflowType := currentWorkflow.ExecutionInfo.WorkflowTypeName
taskList := currentWorkflow.ExecutionInfo.TaskList
emitWorkflowCompletionStats(c.metricsClient, c.logger,
domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
taskList, event)
}
}
return nil
}