in service/history/engine/engineimpl/historyEngine.go [2883:3022]
func (e *historyEngineImpl) ResetWorkflowExecution(
ctx context.Context,
resetRequest *types.HistoryResetWorkflowExecutionRequest,
) (response *types.ResetWorkflowExecutionResponse, retError error) {
request := resetRequest.ResetRequest
domainID := resetRequest.GetDomainUUID()
workflowID := request.WorkflowExecution.GetWorkflowID()
baseRunID := request.WorkflowExecution.GetRunID()
baseContext, baseReleaseFn, err := e.executionCache.GetOrCreateWorkflowExecution(
ctx,
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: baseRunID,
},
)
if err != nil {
return nil, err
}
defer func() { baseReleaseFn(retError) }()
baseMutableState, err := baseContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
if ok := baseMutableState.HasProcessedOrPendingDecision(); !ok {
return nil, &types.BadRequestError{
Message: "Cannot reset workflow without a decision task schedule.",
}
}
if request.GetDecisionFinishEventID() <= common.FirstEventID ||
request.GetDecisionFinishEventID() > baseMutableState.GetNextEventID() {
return nil, &types.BadRequestError{
Message: "Decision finish ID must be > 1 && <= workflow next event ID.",
}
}
domainName, err := e.shard.GetDomainCache().GetDomainName(domainID)
if err != nil {
return nil, err
}
// also load the current run of the workflow, it can be different from the base runID
resp, err := e.executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{
DomainID: domainID,
WorkflowID: request.WorkflowExecution.GetWorkflowID(),
DomainName: domainName,
})
if err != nil {
return nil, err
}
currentRunID := resp.RunID
var currentContext execution.Context
var currentMutableState execution.MutableState
var currentReleaseFn execution.ReleaseFunc
if currentRunID == baseRunID {
currentContext = baseContext
currentMutableState = baseMutableState
} else {
currentContext, currentReleaseFn, err = e.executionCache.GetOrCreateWorkflowExecution(
ctx,
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: currentRunID,
},
)
if err != nil {
return nil, err
}
defer func() { currentReleaseFn(retError) }()
currentMutableState, err = currentContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
}
// dedup by requestID
if currentMutableState.GetExecutionInfo().CreateRequestID == request.GetRequestID() {
e.logger.Info("Duplicated reset request",
tag.WorkflowID(workflowID),
tag.WorkflowRunID(currentRunID),
tag.WorkflowDomainID(domainID))
return &types.ResetWorkflowExecutionResponse{
RunID: currentRunID,
}, nil
}
resetRunID := uuid.New()
baseRebuildLastEventID := request.GetDecisionFinishEventID() - 1
baseVersionHistories := baseMutableState.GetVersionHistories()
baseCurrentBranchToken, err := baseMutableState.GetCurrentBranchToken()
if err != nil {
return nil, err
}
baseRebuildLastEventVersion := baseMutableState.GetCurrentVersion()
baseNextEventID := baseMutableState.GetNextEventID()
if baseVersionHistories != nil {
baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory()
if err != nil {
return nil, err
}
baseRebuildLastEventVersion, err = baseCurrentVersionHistory.GetEventVersion(baseRebuildLastEventID)
if err != nil {
return nil, err
}
baseCurrentBranchToken = baseCurrentVersionHistory.GetBranchToken()
}
if err := e.workflowResetter.ResetWorkflow(
ctx,
domainID,
workflowID,
baseRunID,
baseCurrentBranchToken,
baseRebuildLastEventID,
baseRebuildLastEventVersion,
baseNextEventID,
resetRunID,
request.GetRequestID(),
execution.NewWorkflow(
ctx,
e.shard.GetClusterMetadata(),
currentContext,
currentMutableState,
currentReleaseFn,
),
request.GetReason(),
nil,
request.GetSkipSignalReapply(),
); err != nil {
return nil, err
}
return &types.ResetWorkflowExecutionResponse{
RunID: resetRunID,
}, nil
}