func()

in service/history/engine/engineimpl/historyEngine.go [2883:3022]


func (e *historyEngineImpl) ResetWorkflowExecution(
	ctx context.Context,
	resetRequest *types.HistoryResetWorkflowExecutionRequest,
) (response *types.ResetWorkflowExecutionResponse, retError error) {

	request := resetRequest.ResetRequest
	domainID := resetRequest.GetDomainUUID()
	workflowID := request.WorkflowExecution.GetWorkflowID()
	baseRunID := request.WorkflowExecution.GetRunID()

	baseContext, baseReleaseFn, err := e.executionCache.GetOrCreateWorkflowExecution(
		ctx,
		domainID,
		types.WorkflowExecution{
			WorkflowID: workflowID,
			RunID:      baseRunID,
		},
	)
	if err != nil {
		return nil, err
	}
	defer func() { baseReleaseFn(retError) }()

	baseMutableState, err := baseContext.LoadWorkflowExecution(ctx)
	if err != nil {
		return nil, err
	}
	if ok := baseMutableState.HasProcessedOrPendingDecision(); !ok {
		return nil, &types.BadRequestError{
			Message: "Cannot reset workflow without a decision task schedule.",
		}
	}
	if request.GetDecisionFinishEventID() <= common.FirstEventID ||
		request.GetDecisionFinishEventID() > baseMutableState.GetNextEventID() {
		return nil, &types.BadRequestError{
			Message: "Decision finish ID must be > 1 && <= workflow next event ID.",
		}
	}
	domainName, err := e.shard.GetDomainCache().GetDomainName(domainID)
	if err != nil {
		return nil, err
	}
	// also load the current run of the workflow, it can be different from the base runID
	resp, err := e.executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
		DomainID:   domainID,
		WorkflowID: request.WorkflowExecution.GetWorkflowID(),
		DomainName: domainName,
	})
	if err != nil {
		return nil, err
	}

	currentRunID := resp.RunID
	var currentContext execution.Context
	var currentMutableState execution.MutableState
	var currentReleaseFn execution.ReleaseFunc
	if currentRunID == baseRunID {
		currentContext = baseContext
		currentMutableState = baseMutableState
	} else {
		currentContext, currentReleaseFn, err = e.executionCache.GetOrCreateWorkflowExecution(
			ctx,
			domainID,
			types.WorkflowExecution{
				WorkflowID: workflowID,
				RunID:      currentRunID,
			},
		)
		if err != nil {
			return nil, err
		}
		defer func() { currentReleaseFn(retError) }()

		currentMutableState, err = currentContext.LoadWorkflowExecution(ctx)
		if err != nil {
			return nil, err
		}
	}

	// dedup by requestID
	if currentMutableState.GetExecutionInfo().CreateRequestID == request.GetRequestID() {
		e.logger.Info("Duplicated reset request",
			tag.WorkflowID(workflowID),
			tag.WorkflowRunID(currentRunID),
			tag.WorkflowDomainID(domainID))
		return &types.ResetWorkflowExecutionResponse{
			RunID: currentRunID,
		}, nil
	}

	resetRunID := uuid.New()
	baseRebuildLastEventID := request.GetDecisionFinishEventID() - 1
	baseVersionHistories := baseMutableState.GetVersionHistories()
	baseCurrentBranchToken, err := baseMutableState.GetCurrentBranchToken()
	if err != nil {
		return nil, err
	}
	baseRebuildLastEventVersion := baseMutableState.GetCurrentVersion()
	baseNextEventID := baseMutableState.GetNextEventID()

	if baseVersionHistories != nil {
		baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
		if err != nil {
			return nil, err
		}
		baseRebuildLastEventVersion, err = baseCurrentVersionHistory.GetEventVersion(baseRebuildLastEventID)
		if err != nil {
			return nil, err
		}
		baseCurrentBranchToken = baseCurrentVersionHistory.GetBranchToken()
	}

	if err := e.workflowResetter.ResetWorkflow(
		ctx,
		domainID,
		workflowID,
		baseRunID,
		baseCurrentBranchToken,
		baseRebuildLastEventID,
		baseRebuildLastEventVersion,
		baseNextEventID,
		resetRunID,
		request.GetRequestID(),
		execution.NewWorkflow(
			ctx,
			e.shard.GetClusterMetadata(),
			currentContext,
			currentMutableState,
			currentReleaseFn,
		),
		request.GetReason(),
		nil,
		request.GetSkipSignalReapply(),
	); err != nil {
		return nil, err
	}
	return &types.ResetWorkflowExecutionResponse{
		RunID: resetRunID,
	}, nil
}