in common/persistence/sql/sql_execution_store_util.go [203:405]
func applyWorkflowSnapshotTxAsReset(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
workflowSnapshot *p.InternalWorkflowSnapshot,
parser serialization.Parser,
) error {
executionInfo := workflowSnapshot.ExecutionInfo
versionHistories := workflowSnapshot.VersionHistories
workflowChecksum := workflowSnapshot.ChecksumData
startVersion := workflowSnapshot.StartVersion
lastWriteVersion := workflowSnapshot.LastWriteVersion
domainID := serialization.MustParseUUID(executionInfo.DomainID)
workflowID := executionInfo.WorkflowID
runID := serialization.MustParseUUID(executionInfo.RunID)
// TODO Is there a way to modify the various map tables without fear of other people adding rows after we delete, without locking the executions row?
if err := lockAndCheckNextEventID(
ctx,
tx,
shardID,
domainID,
workflowID,
runID,
workflowSnapshot.Condition); err != nil {
return err
}
if err := updateExecution(
ctx,
tx,
executionInfo,
versionHistories,
workflowChecksum,
startVersion,
lastWriteVersion,
shardID,
parser); err != nil {
return err
}
if err := applyTasks(
ctx,
tx,
shardID,
domainID,
workflowID,
runID,
workflowSnapshot.TransferTasks,
workflowSnapshot.CrossClusterTasks,
workflowSnapshot.ReplicationTasks,
workflowSnapshot.TimerTasks,
parser,
); err != nil {
return err
}
if err := deleteActivityInfoMap(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateActivityInfos(
ctx,
tx,
workflowSnapshot.ActivityInfos,
nil,
shardID,
domainID,
workflowID,
runID,
parser); err != nil {
return err
}
if err := deleteTimerInfoMap(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateTimerInfos(
ctx,
tx,
workflowSnapshot.TimerInfos,
nil,
shardID,
domainID,
workflowID,
runID,
parser); err != nil {
return err
}
if err := deleteChildExecutionInfoMap(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateChildExecutionInfos(
ctx,
tx,
workflowSnapshot.ChildExecutionInfos,
nil,
shardID,
domainID,
workflowID,
runID,
parser); err != nil {
return err
}
if err := deleteRequestCancelInfoMap(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateRequestCancelInfos(
ctx,
tx,
workflowSnapshot.RequestCancelInfos,
nil,
shardID,
domainID,
workflowID,
runID,
parser); err != nil {
return err
}
if err := deleteSignalInfoMap(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateSignalInfos(
ctx,
tx,
workflowSnapshot.SignalInfos,
nil,
shardID,
domainID,
workflowID,
runID,
parser); err != nil {
return err
}
if err := deleteSignalsRequestedSet(
ctx,
tx,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
if err := updateSignalsRequested(
ctx,
tx,
workflowSnapshot.SignalRequestedIDs,
nil,
shardID,
domainID,
workflowID,
runID); err != nil {
return err
}
return deleteBufferedEvents(
ctx,
tx,
shardID,
domainID,
workflowID,
runID)
}