in common/persistence/nosql/nosql_execution_store.go [319:456]
func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.InternalConflictResolveWorkflowExecutionRequest,
) error {
currentWorkflow := request.CurrentWorkflowMutation
resetWorkflow := request.ResetWorkflowSnapshot
newWorkflow := request.NewWorkflowSnapshot
domainID := resetWorkflow.ExecutionInfo.DomainID
workflowID := resetWorkflow.ExecutionInfo.WorkflowID
if err := persistence.ValidateConflictResolveWorkflowModeState(
request.Mode,
resetWorkflow,
newWorkflow,
currentWorkflow,
); err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
var prevRunID string
switch request.Mode {
case persistence.ConflictResolveWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
domainID,
workflowID,
resetWorkflow.ExecutionInfo.RunID); err != nil {
return err
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.ConflictResolveWorkflowModeUpdateCurrent:
executionInfo := resetWorkflow.ExecutionInfo
lastWriteVersion := resetWorkflow.LastWriteVersion
if newWorkflow != nil {
executionInfo = newWorkflow.ExecutionInfo
lastWriteVersion = newWorkflow.LastWriteVersion
}
if currentWorkflow != nil {
prevRunID = currentWorkflow.ExecutionInfo.RunID
} else {
// reset workflow is current
prevRunID = resetWorkflow.ExecutionInfo.RunID
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: executionInfo.RunID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
CreateRequestID: executionInfo.CreateRequestID,
LastWriteVersion: lastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &prevRunID,
},
}
default:
return &types.InternalServiceError{
Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
}
}
var mutateExecution, insertExecution, resetExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
// 1. current
if currentWorkflow != nil {
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(currentWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, currentWorkflow.ExecutionInfo.RunID,
currentWorkflow.TransferTasks, currentWorkflow.CrossClusterTasks, currentWorkflow.ReplicationTasks, currentWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
// 2. reset
resetExecution, err = d.prepareResetWorkflowExecutionRequestWithMapsAndEventBuffer(&resetWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, resetWorkflow.ExecutionInfo.RunID,
resetWorkflow.TransferTasks, resetWorkflow.CrossClusterTasks, resetWorkflow.ReplicationTasks, resetWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
// 3. new
if newWorkflow != nil {
insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
return d.processUpdateWorkflowResult(err, request.RangeID)
}