service/history/reset/resetter.go (575 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination resetter_mock.go
package reset
import (
"context"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/collection"
"github.com/uber/cadence/common/definition"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/shard"
)
type (
// WorkflowResetter is the new NDC compatible workflow reset component
WorkflowResetter interface {
ResetWorkflow(
ctx context.Context,
domainID string,
workflowID string,
baseRunID string,
baseBranchToken []byte,
baseRebuildLastEventID int64,
baseRebuildLastEventVersion int64,
baseNextEventID int64,
resetRunID string,
resetRequestID string,
currentWorkflow execution.Workflow,
resetReason string,
additionalReapplyEvents []*types.HistoryEvent,
skipSignalReapply bool,
) error
}
workflowResetterImpl struct {
shard shard.Context
domainCache cache.DomainCache
clusterMetadata cluster.Metadata
historyV2Mgr persistence.HistoryManager
executionCache *execution.Cache
newStateRebuilder nDCStateRebuilderProvider
logger log.Logger
}
nDCStateRebuilderProvider func() execution.StateRebuilder
)
var _ WorkflowResetter = (*workflowResetterImpl)(nil)
// NewWorkflowResetter creates a workflow resetter
func NewWorkflowResetter(
shard shard.Context,
executionCache *execution.Cache,
logger log.Logger,
) WorkflowResetter {
return &workflowResetterImpl{
shard: shard,
domainCache: shard.GetDomainCache(),
clusterMetadata: shard.GetClusterMetadata(),
historyV2Mgr: shard.GetHistoryManager(),
executionCache: executionCache,
newStateRebuilder: func() execution.StateRebuilder {
return execution.NewStateRebuilder(shard, logger)
},
logger: logger,
}
}
func (r *workflowResetterImpl) ResetWorkflow(
ctx context.Context,
domainID string,
workflowID string,
baseRunID string,
baseBranchToken []byte,
baseRebuildLastEventID int64,
baseRebuildLastEventVersion int64,
baseNextEventID int64,
resetRunID string,
resetRequestID string,
currentWorkflow execution.Workflow,
resetReason string,
additionalReapplyEvents []*types.HistoryEvent,
skipSignalReapply bool,
) (retError error) {
domainEntry, err := r.domainCache.GetDomainByID(domainID)
if err != nil {
return err
}
resetWorkflowVersion := domainEntry.GetFailoverVersion()
currentMutableState := currentWorkflow.GetMutableState()
currentWorkflowTerminated := false
if currentMutableState.IsWorkflowExecutionRunning() {
if err := r.terminateWorkflow(
currentMutableState,
resetReason,
); err != nil {
return err
}
resetWorkflowVersion = currentMutableState.GetCurrentVersion()
currentWorkflowTerminated = true
}
resetWorkflow, err := r.prepareResetWorkflow(
ctx,
domainID,
workflowID,
baseRunID,
baseBranchToken,
baseRebuildLastEventID,
baseRebuildLastEventVersion,
baseNextEventID,
resetRunID,
resetRequestID,
resetWorkflowVersion,
resetReason,
additionalReapplyEvents,
skipSignalReapply,
)
if err != nil {
return err
}
defer resetWorkflow.GetReleaseFn()(retError)
return r.persistToDB(
ctx,
currentWorkflowTerminated,
currentWorkflow,
resetWorkflow,
)
}
func (r *workflowResetterImpl) prepareResetWorkflow(
ctx context.Context,
domainID string,
workflowID string,
baseRunID string,
baseBranchToken []byte,
baseRebuildLastEventID int64,
baseRebuildLastEventVersion int64,
baseNextEventID int64,
resetRunID string,
resetRequestID string,
resetWorkflowVersion int64,
resetReason string,
additionalReapplyEvents []*types.HistoryEvent,
skipSignalReapply bool,
) (execution.Workflow, error) {
resetWorkflow, err := r.replayResetWorkflow(
ctx,
domainID,
workflowID,
baseRunID,
baseBranchToken,
baseRebuildLastEventID,
baseRebuildLastEventVersion,
resetRunID,
resetRequestID,
)
if err != nil {
return nil, err
}
resetMutableState := resetWorkflow.GetMutableState()
baseLastEventVersion := resetMutableState.GetCurrentVersion()
if baseLastEventVersion > resetWorkflowVersion {
return nil, &types.InternalServiceError{
Message: "workflowResetter encounter version mismatch.",
}
}
if err := resetMutableState.UpdateCurrentVersion(
resetWorkflowVersion,
false,
); err != nil {
return nil, err
}
resetMutableState, err = r.closePendingDecisionTask(
resetMutableState,
baseRunID,
resetRunID,
baseLastEventVersion,
resetReason,
resetRequestID,
)
if err != nil {
return nil, err
}
if err := r.failInflightActivity(resetMutableState, resetReason); err != nil {
return nil, err
}
// TODO right now only signals are eligible for reapply, so we can directly skip the whole reapply process
// for the sake of performance. In the future, if there are other events that need to be reapplied, remove this check
// For example, we may want to re-apply activity/timer results for https://github.com/uber/cadence/issues/2934
if !skipSignalReapply {
if err := r.reapplyResetAndContinueAsNewWorkflowEvents(
ctx,
resetMutableState,
domainID,
workflowID,
baseRunID,
baseBranchToken,
baseRebuildLastEventID+1,
baseNextEventID,
); err != nil {
return nil, err
}
}
// NOTE: this is reapplying events that are passing into the API that we shouldn't skip
if err := r.reapplyEvents(resetMutableState, additionalReapplyEvents); err != nil {
return nil, err
}
if err := execution.ScheduleDecision(resetMutableState); err != nil {
return nil, err
}
return resetWorkflow, nil
}
func (r *workflowResetterImpl) persistToDB(
ctx context.Context,
currentWorkflowTerminated bool,
currentWorkflow execution.Workflow,
resetWorkflow execution.Workflow,
) error {
if currentWorkflowTerminated {
return currentWorkflow.GetContext().UpdateWorkflowExecutionWithNewAsActive(
ctx,
r.shard.GetTimeSource().Now(),
resetWorkflow.GetContext(),
resetWorkflow.GetMutableState(),
)
}
currentMutableState := currentWorkflow.GetMutableState()
currentRunID := currentMutableState.GetExecutionInfo().RunID
currentLastWriteVersion, err := currentMutableState.GetLastWriteVersion()
if err != nil {
return err
}
now := r.shard.GetTimeSource().Now()
resetWorkflowSnapshot, resetWorkflowEventsSeq, err := resetWorkflow.GetMutableState().CloseTransactionAsSnapshot(
now,
execution.TransactionPolicyActive,
)
if err != nil {
return err
}
if len(resetWorkflowEventsSeq) != 1 {
return &types.InternalServiceError{
Message: "there should be EXACTLY one batch of events for reset",
}
}
// reset workflow with decision task failed or timed out
resetWorkflowHistory, err := resetWorkflow.GetContext().PersistNonStartWorkflowBatchEvents(ctx, resetWorkflowEventsSeq[0])
if err != nil {
return err
}
return resetWorkflow.GetContext().CreateWorkflowExecution(
ctx,
resetWorkflowSnapshot,
resetWorkflowHistory,
persistence.CreateWorkflowModeContinueAsNew,
currentRunID,
currentLastWriteVersion,
)
}
func (r *workflowResetterImpl) replayResetWorkflow(
ctx context.Context,
domainID string,
workflowID string,
baseRunID string,
baseBranchToken []byte,
baseRebuildLastEventID int64,
baseRebuildLastEventVersion int64,
resetRunID string,
resetRequestID string,
) (execution.Workflow, error) {
resetBranchToken, err := r.forkAndGenerateBranchToken(
ctx,
domainID,
workflowID,
baseBranchToken,
baseRebuildLastEventID+1,
resetRunID,
)
if err != nil {
return nil, err
}
resetContext := execution.NewContext(
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: resetRunID,
},
r.shard,
r.shard.GetExecutionManager(),
r.logger,
)
resetMutableState, resetHistorySize, err := r.newStateRebuilder().Rebuild(
ctx,
r.shard.GetTimeSource().Now(),
definition.NewWorkflowIdentifier(
domainID,
workflowID,
baseRunID,
),
baseBranchToken,
baseRebuildLastEventID,
baseRebuildLastEventVersion,
definition.NewWorkflowIdentifier(
domainID,
workflowID,
resetRunID,
),
resetBranchToken,
resetRequestID,
)
if err != nil {
return nil, err
}
resetContext.SetHistorySize(resetHistorySize)
return execution.NewWorkflow(
ctx,
r.clusterMetadata,
resetContext,
resetMutableState,
execution.NoopReleaseFn,
), nil
}
func (r *workflowResetterImpl) failInflightActivity(
mutableState execution.MutableState,
terminateReason string,
) error {
for _, ai := range mutableState.GetPendingActivityInfos() {
switch ai.StartedID {
case common.EmptyEventID:
// activity not started, noop
case common.TransientEventID:
// activity is started (with retry policy)
// should not encounter this case when rebuilding mutable state
return &types.InternalServiceError{
Message: "workflowResetter encounter transient activity",
}
default:
if _, err := mutableState.AddActivityTaskFailedEvent(
ai.ScheduleID,
ai.StartedID,
&types.RespondActivityTaskFailedRequest{
Reason: common.StringPtr(terminateReason),
Details: ai.Details,
Identity: ai.StartedIdentity,
},
); err != nil {
return err
}
}
}
return nil
}
func (r *workflowResetterImpl) forkAndGenerateBranchToken(
ctx context.Context,
domainID string,
workflowID string,
forkBranchToken []byte,
forkNodeID int64,
resetRunID string,
) ([]byte, error) {
// fork a new history branch
shardID := r.shard.GetShardID()
domainName, err := r.domainCache.GetDomainName(domainID)
if err != nil {
return nil, err
}
resp, err := r.historyV2Mgr.ForkHistoryBranch(ctx, &persistence.ForkHistoryBranchRequest{
ForkBranchToken: forkBranchToken,
ForkNodeID: forkNodeID,
Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, resetRunID),
ShardID: common.IntPtr(shardID),
DomainName: domainName,
})
if err != nil {
return nil, err
}
return resp.NewBranchToken, nil
}
func (r *workflowResetterImpl) terminateWorkflow(
mutableState execution.MutableState,
terminateReason string,
) error {
eventBatchFirstEventID := mutableState.GetNextEventID()
return execution.TerminateWorkflow(
mutableState,
eventBatchFirstEventID,
terminateReason,
nil,
execution.IdentityHistoryService,
)
}
func (r *workflowResetterImpl) reapplyResetAndContinueAsNewWorkflowEvents(
ctx context.Context,
resetMutableState execution.MutableState,
domainID string,
workflowID string,
baseRunID string,
baseBranchToken []byte,
baseRebuildNextEventID int64,
baseNextEventID int64,
) error {
// TODO change this logic to fetching all workflow [baseWorkflow, currentWorkflow]
// from visibility for better coverage of events eligible for re-application.
var nextRunID string
var err error
// first special handling the remaining events for base workflow
if nextRunID, err = r.reapplyWorkflowEvents(
ctx,
resetMutableState,
baseRebuildNextEventID,
baseNextEventID,
baseBranchToken,
); err != nil {
return err
}
getNextEventIDBranchToken := func(runID string) (nextEventID int64, branchToken []byte, retError error) {
context, release, err := r.executionCache.GetOrCreateWorkflowExecution(
ctx,
domainID,
types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
)
if err != nil {
return 0, nil, err
}
defer func() { release(retError) }()
mutableState, err := context.LoadWorkflowExecution(ctx)
if err != nil {
// no matter what error happen, we need to retry
return 0, nil, err
}
nextEventID = mutableState.GetNextEventID()
branchToken, err = mutableState.GetCurrentBranchToken()
if err != nil {
return 0, nil, err
}
return nextEventID, branchToken, nil
}
// second for remaining continue as new workflow, reapply eligible events
for len(nextRunID) != 0 {
nextWorkflowNextEventID, nextWorkflowBranchToken, err := getNextEventIDBranchToken(nextRunID)
if err != nil {
return err
}
if nextRunID, err = r.reapplyWorkflowEvents(
ctx,
resetMutableState,
common.FirstEventID,
nextWorkflowNextEventID,
nextWorkflowBranchToken,
); err != nil {
return err
}
}
return nil
}
func (r *workflowResetterImpl) reapplyWorkflowEvents(
ctx context.Context,
mutableState execution.MutableState,
firstEventID int64,
nextEventID int64,
branchToken []byte,
) (string, error) {
// TODO change this logic to fetching all workflow [baseWorkflow, currentWorkflow]
// from visibility for better coverage of events eligible for re-application.
// after the above change, this API do not have to return the continue as new run ID
if firstEventID == nextEventID {
// This means the workflow reset to a pending decision task
// and the decision task is the latest event in the workflow.
return "", nil
}
domainID := mutableState.GetExecutionInfo().DomainID
iter := collection.NewPagingIterator(r.getPaginationFn(
ctx,
firstEventID,
nextEventID,
branchToken,
domainID,
))
var nextRunID string
var lastEvents []*types.HistoryEvent
for iter.HasNext() {
batch, err := iter.Next()
if err != nil {
return "", err
}
lastEvents = batch.(*types.History).Events
if err := r.reapplyEvents(mutableState, lastEvents); err != nil {
return "", err
}
}
if len(lastEvents) > 0 {
lastEvent := lastEvents[len(lastEvents)-1]
if lastEvent.GetEventType() == types.EventTypeWorkflowExecutionContinuedAsNew {
nextRunID = lastEvent.GetWorkflowExecutionContinuedAsNewEventAttributes().GetNewExecutionRunID()
}
}
return nextRunID, nil
}
func (r *workflowResetterImpl) reapplyEvents(
mutableState execution.MutableState,
events []*types.HistoryEvent,
) error {
for _, event := range events {
switch event.GetEventType() {
case types.EventTypeWorkflowExecutionSignaled:
attr := event.GetWorkflowExecutionSignaledEventAttributes()
if _, err := mutableState.AddWorkflowExecutionSignaled(
attr.GetSignalName(),
attr.GetInput(),
attr.GetIdentity(),
attr.GetRequestID(),
); err != nil {
return err
}
default:
// events other than signal will be ignored
}
}
return nil
}
func (r *workflowResetterImpl) getPaginationFn(
ctx context.Context,
firstEventID int64,
nextEventID int64,
branchToken []byte,
domainID string,
) collection.PaginationFn {
return func(paginationToken []byte) ([]interface{}, []byte, error) {
_, historyBatches, token, _, err := persistenceutils.PaginateHistory(
ctx,
r.historyV2Mgr,
true,
branchToken,
firstEventID,
nextEventID,
paginationToken,
execution.NDCDefaultPageSize,
common.IntPtr(r.shard.GetShardID()),
domainID,
r.domainCache,
)
if err != nil {
return nil, nil, err
}
var paginateItems []interface{}
for _, history := range historyBatches {
paginateItems = append(paginateItems, history)
}
return paginateItems, token, nil
}
}
func (r *workflowResetterImpl) closePendingDecisionTask(
resetMutableState execution.MutableState,
baseRunID string,
resetRunID string,
baseLastEventVersion int64,
resetReason string,
resetRequestID string,
) (execution.MutableState, error) {
if len(resetMutableState.GetPendingChildExecutionInfos()) > 0 {
return nil, &types.BadRequestError{
Message: "Can not reset workflow with pending child workflows",
}
}
if decision, ok := resetMutableState.GetInFlightDecision(); ok {
// reset workflow has decision task start
_, err := resetMutableState.AddDecisionTaskFailedEvent(
decision.ScheduleID,
decision.StartedID,
types.DecisionTaskFailedCauseResetWorkflow,
nil,
execution.IdentityHistoryService,
resetReason,
"",
baseRunID,
resetRunID,
baseLastEventVersion,
resetRequestID,
)
if err != nil {
return nil, err
}
} else if decision, ok := resetMutableState.GetPendingDecision(); ok {
if ok {
// reset workflow has decision task schedule
_, err := resetMutableState.AddDecisionTaskResetTimeoutEvent(
decision.ScheduleID,
baseRunID,
resetRunID,
baseLastEventVersion,
resetReason,
resetRequestID,
)
if err != nil {
return nil, err
}
}
}
return resetMutableState, nil
}