in common/persistence/nosql/nosql_execution_store.go [170:317]
func (d *nosqlExecutionStore) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.InternalUpdateWorkflowExecutionRequest,
) error {
updateWorkflow := request.UpdateWorkflowMutation
newWorkflow := request.NewWorkflowSnapshot
executionInfo := updateWorkflow.ExecutionInfo
domainID := executionInfo.DomainID
workflowID := executionInfo.WorkflowID
runID := executionInfo.RunID
if err := persistence.ValidateUpdateWorkflowModeState(
request.Mode,
updateWorkflow,
newWorkflow,
); err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
switch request.Mode {
case persistence.UpdateWorkflowModeIgnoreCurrent:
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.UpdateWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
domainID,
workflowID,
runID); err != nil {
return err
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.UpdateWorkflowModeUpdateCurrent:
if newWorkflow != nil {
newExecutionInfo := newWorkflow.ExecutionInfo
newLastWriteVersion := newWorkflow.LastWriteVersion
newDomainID := newExecutionInfo.DomainID
// TODO: ?? would it change at all ??
newWorkflowID := newExecutionInfo.WorkflowID
newRunID := newExecutionInfo.RunID
if domainID != newDomainID {
return &types.InternalServiceError{
Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
}
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: newDomainID,
WorkflowID: newWorkflowID,
RunID: newRunID,
State: newExecutionInfo.State,
CloseStatus: newExecutionInfo.CloseStatus,
CreateRequestID: newExecutionInfo.CreateRequestID,
LastWriteVersion: newLastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &runID,
},
}
} else {
lastWriteVersion := updateWorkflow.LastWriteVersion
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
CreateRequestID: executionInfo.CreateRequestID,
LastWriteVersion: lastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &runID,
},
}
}
default:
return &types.InternalServiceError{
Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
}
}
var mutateExecution, insertExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
// 1. current
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, updateWorkflow.ExecutionInfo.RunID,
updateWorkflow.TransferTasks, updateWorkflow.CrossClusterTasks, updateWorkflow.ReplicationTasks, updateWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
// 2. new
if newWorkflow != nil {
insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
return d.processUpdateWorkflowResult(err, request.RangeID)
}