in common/persistence/sql/sql_execution_store_util.go [38:201]
func applyWorkflowMutationTx(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
workflowMutation *p.InternalWorkflowMutation,
parser serialization.Parser,
) error {
executionInfo := workflowMutation.ExecutionInfo
versionHistories := workflowMutation.VersionHistories
workflowChecksum := workflowMutation.ChecksumData
startVersion := workflowMutation.StartVersion
lastWriteVersion := workflowMutation.LastWriteVersion
domainID := serialization.MustParseUUID(executionInfo.DomainID)
workflowID := executionInfo.WorkflowID
runID := serialization.MustParseUUID(executionInfo.RunID)
// TODO Remove me if UPDATE holds the lock to the end of a transaction
if err := lockAndCheckNextEventID(
ctx,
tx,
shardID,
domainID,
workflowID,
runID,
workflowMutation.Condition); err != nil {
return err
}
if err := updateExecution(
ctx,
tx,
executionInfo,
versionHistories,
workflowChecksum,
startVersion,
lastWriteVersion,
shardID,
parser); err != nil {
return err
}
if err := applyTasks(
ctx,
tx,
shardID,
domainID,
workflowID,
runID,
workflowMutation.TransferTasks,
workflowMutation.CrossClusterTasks,
workflowMutation.ReplicationTasks,
workflowMutation.TimerTasks,
parser,
); err != nil {
return err
}
if err := updateActivityInfos(
ctx,
tx,
workflowMutation.UpsertActivityInfos,
workflowMutation.DeleteActivityInfos,
shardID,
domainID,
workflowID,
runID,
parser,
); err != nil {
return err
}
if err := updateTimerInfos(
ctx,
tx,
workflowMutation.UpsertTimerInfos,
workflowMutation.DeleteTimerInfos,
shardID,
domainID,
workflowID,
runID,
parser,
); err != nil {
return err
}
if err := updateChildExecutionInfos(
ctx,
tx,
workflowMutation.UpsertChildExecutionInfos,
workflowMutation.DeleteChildExecutionInfos,
shardID,
domainID,
workflowID,
runID,
parser,
); err != nil {
return err
}
if err := updateRequestCancelInfos(
ctx,
tx,
workflowMutation.UpsertRequestCancelInfos,
workflowMutation.DeleteRequestCancelInfos,
shardID,
domainID,
workflowID,
runID,
parser,
); err != nil {
return err
}
if err := updateSignalInfos(
ctx,
tx,
workflowMutation.UpsertSignalInfos,
workflowMutation.DeleteSignalInfos,
shardID,
domainID,
workflowID,
runID,
parser,
); err != nil {
return err
}
if err := updateSignalsRequested(
ctx,
tx,
workflowMutation.UpsertSignalRequestedIDs,
workflowMutation.DeleteSignalRequestedIDs,
shardID,
domainID,
workflowID,
runID,
); err != nil {
return err
}
if workflowMutation.ClearBufferedEvents {
if err := deleteBufferedEvents(
ctx,
tx,
shardID,
domainID,
workflowID,
runID,
); err != nil {
return err
}
}
return updateBufferedEvents(
ctx,
tx,
workflowMutation.NewBufferedEvents,
shardID,
domainID,
workflowID,
runID,
)
}