func()

in service/history/execution/context.go [433:614]


func (c *contextImpl) ConflictResolveWorkflowExecution(
	ctx context.Context,
	now time.Time,
	conflictResolveMode persistence.ConflictResolveWorkflowMode,
	resetMutableState MutableState,
	newContext Context,
	newMutableState MutableState,
	currentContext Context,
	currentMutableState MutableState,
	currentTransactionPolicy *TransactionPolicy,
) (retError error) {

	defer func() {
		if retError != nil {
			c.Clear()
		}
	}()

	resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(
		now,
		TransactionPolicyPassive,
	)
	if err != nil {
		return err
	}

	var persistedBlobs events.PersistedBlobs
	resetHistorySize := c.GetHistorySize()
	for _, workflowEvents := range resetWorkflowEventsSeq {
		blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
		if err != nil {
			return err
		}
		resetHistorySize += int64(len(blob.Data))
		persistedBlobs = append(persistedBlobs, blob)
	}
	c.SetHistorySize(resetHistorySize)
	resetWorkflow.ExecutionStats = &persistence.ExecutionStats{
		HistorySize: resetHistorySize,
	}

	var newWorkflow *persistence.WorkflowSnapshot
	var newWorkflowEventsSeq []*persistence.WorkflowEvents
	if newContext != nil && newMutableState != nil {

		defer func() {
			if retError != nil {
				newContext.Clear()
			}
		}()

		newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
			now,
			TransactionPolicyPassive,
		)
		if err != nil {
			return err
		}
		newWorkflowSizeSize := newContext.GetHistorySize()
		startEvents := newWorkflowEventsSeq[0]
		blob, err := c.PersistStartWorkflowBatchEvents(ctx, startEvents)
		if err != nil {
			return err
		}
		newWorkflowSizeSize += int64(len(blob.Data))
		newContext.SetHistorySize(newWorkflowSizeSize)
		newWorkflow.ExecutionStats = &persistence.ExecutionStats{
			HistorySize: newWorkflowSizeSize,
		}
		persistedBlobs = append(persistedBlobs, blob)
	}

	var currentWorkflow *persistence.WorkflowMutation
	var currentWorkflowEventsSeq []*persistence.WorkflowEvents
	if currentContext != nil && currentMutableState != nil && currentTransactionPolicy != nil {

		defer func() {
			if retError != nil {
				currentContext.Clear()
			}
		}()

		currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(
			now,
			*currentTransactionPolicy,
		)
		if err != nil {
			return err
		}
		currentWorkflowSize := currentContext.GetHistorySize()
		for _, workflowEvents := range currentWorkflowEventsSeq {
			blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
			if err != nil {
				return err
			}
			currentWorkflowSize += int64(len(blob.Data))
			persistedBlobs = append(persistedBlobs, blob)
		}
		currentContext.SetHistorySize(currentWorkflowSize)
		currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
			HistorySize: currentWorkflowSize,
		}
	}

	if err := c.conflictResolveEventReapply(
		conflictResolveMode,
		resetWorkflowEventsSeq,
		newWorkflowEventsSeq,
		// current workflow events will not participate in the events reapplication
	); err != nil {
		return err
	}
	domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
	if errorDomainName != nil {
		return errorDomainName
	}
	resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
		// RangeID , this is set by shard context
		Mode:                    conflictResolveMode,
		ResetWorkflowSnapshot:   *resetWorkflow,
		NewWorkflowSnapshot:     newWorkflow,
		CurrentWorkflowMutation: currentWorkflow,
		// Encoding, this is set by shard context
		DomainName: domain,
	})
	if err != nil {
		if isOperationPossiblySuccessfulError(err) {
			notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, true)
			notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
			notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
		}
		return err
	}

	currentBranchToken, err := resetMutableState.GetCurrentBranchToken()
	if err != nil {
		return err
	}

	workflowState, workflowCloseState := resetMutableState.GetWorkflowStateCloseStatus()
	// Current branch changed and notify the watchers
	c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
		c.domainID,
		&c.workflowExecution,
		resetMutableState.GetLastFirstEventID(),
		resetMutableState.GetNextEventID(),
		resetMutableState.GetPreviousStartedEventID(),
		currentBranchToken,
		workflowState,
		workflowCloseState,
	))

	notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, false)
	notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
	notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)

	// finally emit session stats
	domainName := c.GetDomainName()
	emitWorkflowHistoryStats(
		c.metricsClient,
		domainName,
		int(c.stats.HistorySize),
		int(resetMutableState.GetNextEventID()-1),
	)
	emitSessionUpdateStats(
		c.metricsClient,
		domainName,
		resp.MutableStateUpdateSessionStats,
	)
	// emit workflow completion stats if any
	if resetWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
		if event, err := resetMutableState.GetCompletionEvent(ctx); err == nil {
			workflowType := resetWorkflow.ExecutionInfo.WorkflowTypeName
			taskList := resetWorkflow.ExecutionInfo.TaskList
			emitWorkflowCompletionStats(c.metricsClient, c.logger,
				domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
				taskList, event)
		}
	}

	return nil
}