in service/history/execution/context.go [433:614]
func (c *contextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
now time.Time,
conflictResolveMode persistence.ConflictResolveWorkflowMode,
resetMutableState MutableState,
newContext Context,
newMutableState MutableState,
currentContext Context,
currentMutableState MutableState,
currentTransactionPolicy *TransactionPolicy,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}
var persistedBlobs events.PersistedBlobs
resetHistorySize := c.GetHistorySize()
for _, workflowEvents := range resetWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
if err != nil {
return err
}
resetHistorySize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
c.SetHistorySize(resetHistorySize)
resetWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: resetHistorySize,
}
var newWorkflow *persistence.WorkflowSnapshot
var newWorkflowEventsSeq []*persistence.WorkflowEvents
if newContext != nil && newMutableState != nil {
defer func() {
if retError != nil {
newContext.Clear()
}
}()
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
blob, err := c.PersistStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
newWorkflowSizeSize += int64(len(blob.Data))
newContext.SetHistorySize(newWorkflowSizeSize)
newWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: newWorkflowSizeSize,
}
persistedBlobs = append(persistedBlobs, blob)
}
var currentWorkflow *persistence.WorkflowMutation
var currentWorkflowEventsSeq []*persistence.WorkflowEvents
if currentContext != nil && currentMutableState != nil && currentTransactionPolicy != nil {
defer func() {
if retError != nil {
currentContext.Clear()
}
}()
currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(
now,
*currentTransactionPolicy,
)
if err != nil {
return err
}
currentWorkflowSize := currentContext.GetHistorySize()
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
if err != nil {
return err
}
currentWorkflowSize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
currentContext.SetHistorySize(currentWorkflowSize)
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: currentWorkflowSize,
}
}
if err := c.conflictResolveEventReapply(
conflictResolveMode,
resetWorkflowEventsSeq,
newWorkflowEventsSeq,
// current workflow events will not participate in the events reapplication
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: conflictResolveMode,
ResetWorkflowSnapshot: *resetWorkflow,
NewWorkflowSnapshot: newWorkflow,
CurrentWorkflowMutation: currentWorkflow,
// Encoding, this is set by shard context
DomainName: domain,
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
}
return err
}
currentBranchToken, err := resetMutableState.GetCurrentBranchToken()
if err != nil {
return err
}
workflowState, workflowCloseState := resetMutableState.GetWorkflowStateCloseStatus()
// Current branch changed and notify the watchers
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
resetMutableState.GetLastFirstEventID(),
resetMutableState.GetNextEventID(),
resetMutableState.GetPreviousStartedEventID(),
currentBranchToken,
workflowState,
workflowCloseState,
))
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)
// finally emit session stats
domainName := c.GetDomainName()
emitWorkflowHistoryStats(
c.metricsClient,
domainName,
int(c.stats.HistorySize),
int(resetMutableState.GetNextEventID()-1),
)
emitSessionUpdateStats(
c.metricsClient,
domainName,
resp.MutableStateUpdateSessionStats,
)
// emit workflow completion stats if any
if resetWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := resetMutableState.GetCompletionEvent(ctx); err == nil {
workflowType := resetWorkflow.ExecutionInfo.WorkflowTypeName
taskList := resetWorkflow.ExecutionInfo.TaskList
emitWorkflowCompletionStats(c.metricsClient, c.logger,
domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
taskList, event)
}
}
return nil
}