func()

in service/history/execution/context.go [744:914]


func (c *contextImpl) UpdateWorkflowExecutionWithNew(
	ctx context.Context,
	now time.Time,
	updateMode persistence.UpdateWorkflowMode,
	newContext Context,
	newMutableState MutableState,
	currentWorkflowTransactionPolicy TransactionPolicy,
	newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {
	defer func() {
		if retError != nil {
			c.Clear()
		}
	}()

	currentWorkflow, currentWorkflowEventsSeq, err := c.mutableState.CloseTransactionAsMutation(
		now,
		currentWorkflowTransactionPolicy,
	)
	if err != nil {
		return err
	}
	var persistedBlobs events.PersistedBlobs
	currentWorkflowSize := c.GetHistorySize()
	oldWorkflowSize := currentWorkflowSize
	currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1
	oldWorkflowHistoryCount := currentWorkflowHistoryCount
	for _, workflowEvents := range currentWorkflowEventsSeq {
		blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
		currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
		if err != nil {
			return err
		}
		currentWorkflowSize += int64(len(blob.Data))
		persistedBlobs = append(persistedBlobs, blob)
	}
	c.SetHistorySize(currentWorkflowSize)
	currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
		HistorySize: currentWorkflowSize,
	}

	var newWorkflow *persistence.WorkflowSnapshot
	var newWorkflowEventsSeq []*persistence.WorkflowEvents
	if newContext != nil && newMutableState != nil && newWorkflowTransactionPolicy != nil {

		defer func() {
			if retError != nil {
				newContext.Clear()
			}
		}()

		newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
			now,
			*newWorkflowTransactionPolicy,
		)
		if err != nil {
			return err
		}
		newWorkflowSizeSize := newContext.GetHistorySize()
		startEvents := newWorkflowEventsSeq[0]
		firstEventID := startEvents.Events[0].ID
		var blob events.PersistedBlob
		if firstEventID == common.FirstEventID {
			blob, err = c.PersistStartWorkflowBatchEvents(ctx, startEvents)
			if err != nil {
				return err
			}
		} else {
			// NOTE: This is the case for reset workflow, reset workflow already inserted a branch record
			blob, err = c.PersistNonStartWorkflowBatchEvents(ctx, startEvents)
			if err != nil {
				return err
			}
		}

		persistedBlobs = append(persistedBlobs, blob)
		newWorkflowSizeSize += int64(len(blob.Data))
		newContext.SetHistorySize(newWorkflowSizeSize)
		newWorkflow.ExecutionStats = &persistence.ExecutionStats{
			HistorySize: newWorkflowSizeSize,
		}
	}

	if err := mergeContinueAsNewReplicationTasks(
		updateMode,
		currentWorkflow,
		newWorkflow,
	); err != nil {
		return err
	}

	if err := c.updateWorkflowExecutionEventReapply(
		updateMode,
		currentWorkflowEventsSeq,
		newWorkflowEventsSeq,
	); err != nil {
		return err
	}
	domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
	if errorDomainName != nil {
		return errorDomainName
	}
	resp, err := c.updateWorkflowExecutionWithRetry(ctx, &persistence.UpdateWorkflowExecutionRequest{
		// RangeID , this is set by shard context
		Mode:                   updateMode,
		UpdateWorkflowMutation: *currentWorkflow,
		NewWorkflowSnapshot:    newWorkflow,
		// Encoding, this is set by shard context
		DomainName: domain,
	})
	if err != nil {
		if isOperationPossiblySuccessfulError(err) {
			notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
			notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
		}
		return err
	}

	// TODO remove updateCondition in favor of condition in mutable state
	c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID

	// for any change in the workflow, send a event
	currentBranchToken, err := c.mutableState.GetCurrentBranchToken()
	if err != nil {
		return err
	}
	workflowState, workflowCloseState := c.mutableState.GetWorkflowStateCloseStatus()
	c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
		c.domainID,
		&c.workflowExecution,
		c.mutableState.GetLastFirstEventID(),
		c.mutableState.GetNextEventID(),
		c.mutableState.GetPreviousStartedEventID(),
		currentBranchToken,
		workflowState,
		workflowCloseState,
	))

	// notify current workflow tasks
	notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)

	// notify new workflow tasks
	notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)

	// finally emit session stats
	domainName := c.GetDomainName()
	emitWorkflowHistoryStats(
		c.metricsClient,
		domainName,
		int(c.stats.HistorySize),
		int(c.mutableState.GetNextEventID()-1),
	)
	emitSessionUpdateStats(
		c.metricsClient,
		domainName,
		resp.MutableStateUpdateSessionStats,
	)
	c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount)
	// emit workflow completion stats if any
	if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
		if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
			workflowType := currentWorkflow.ExecutionInfo.WorkflowTypeName
			taskList := currentWorkflow.ExecutionInfo.TaskList
			emitWorkflowCompletionStats(c.metricsClient, c.logger,
				domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
				taskList, event)
		}
	}

	return nil
}