func applyWorkflowMutationTx()

in common/persistence/sql/sql_execution_store_util.go [38:201]


func applyWorkflowMutationTx(
	ctx context.Context,
	tx sqlplugin.Tx,
	shardID int,
	workflowMutation *p.InternalWorkflowMutation,
	parser serialization.Parser,
) error {

	executionInfo := workflowMutation.ExecutionInfo
	versionHistories := workflowMutation.VersionHistories
	workflowChecksum := workflowMutation.ChecksumData
	startVersion := workflowMutation.StartVersion
	lastWriteVersion := workflowMutation.LastWriteVersion
	domainID := serialization.MustParseUUID(executionInfo.DomainID)
	workflowID := executionInfo.WorkflowID
	runID := serialization.MustParseUUID(executionInfo.RunID)

	// TODO Remove me if UPDATE holds the lock to the end of a transaction
	if err := lockAndCheckNextEventID(
		ctx,
		tx,
		shardID,
		domainID,
		workflowID,
		runID,
		workflowMutation.Condition); err != nil {
		return err
	}

	if err := updateExecution(
		ctx,
		tx,
		executionInfo,
		versionHistories,
		workflowChecksum,
		startVersion,
		lastWriteVersion,
		shardID,
		parser); err != nil {
		return err
	}

	if err := applyTasks(
		ctx,
		tx,
		shardID,
		domainID,
		workflowID,
		runID,
		workflowMutation.TransferTasks,
		workflowMutation.CrossClusterTasks,
		workflowMutation.ReplicationTasks,
		workflowMutation.TimerTasks,
		parser,
	); err != nil {
		return err
	}

	if err := updateActivityInfos(
		ctx,
		tx,
		workflowMutation.UpsertActivityInfos,
		workflowMutation.DeleteActivityInfos,
		shardID,
		domainID,
		workflowID,
		runID,
		parser,
	); err != nil {
		return err
	}

	if err := updateTimerInfos(
		ctx,
		tx,
		workflowMutation.UpsertTimerInfos,
		workflowMutation.DeleteTimerInfos,
		shardID,
		domainID,
		workflowID,
		runID,
		parser,
	); err != nil {
		return err
	}

	if err := updateChildExecutionInfos(
		ctx,
		tx,
		workflowMutation.UpsertChildExecutionInfos,
		workflowMutation.DeleteChildExecutionInfos,
		shardID,
		domainID,
		workflowID,
		runID,
		parser,
	); err != nil {
		return err
	}

	if err := updateRequestCancelInfos(
		ctx,
		tx,
		workflowMutation.UpsertRequestCancelInfos,
		workflowMutation.DeleteRequestCancelInfos,
		shardID,
		domainID,
		workflowID,
		runID,
		parser,
	); err != nil {
		return err
	}

	if err := updateSignalInfos(
		ctx,
		tx,
		workflowMutation.UpsertSignalInfos,
		workflowMutation.DeleteSignalInfos,
		shardID,
		domainID,
		workflowID,
		runID,
		parser,
	); err != nil {
		return err
	}

	if err := updateSignalsRequested(
		ctx,
		tx,
		workflowMutation.UpsertSignalRequestedIDs,
		workflowMutation.DeleteSignalRequestedIDs,
		shardID,
		domainID,
		workflowID,
		runID,
	); err != nil {
		return err
	}

	if workflowMutation.ClearBufferedEvents {
		if err := deleteBufferedEvents(
			ctx,
			tx,
			shardID,
			domainID,
			workflowID,
			runID,
		); err != nil {
			return err
		}
	}

	return updateBufferedEvents(
		ctx,
		tx,
		workflowMutation.NewBufferedEvents,
		shardID,
		domainID,
		workflowID,
		runID,
	)
}