service/history/engine/engineimpl/historyEngine.go (3,133 lines of code) (raw):

// Copyright (c) 2017-2021 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2021 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package engineimpl import ( "bytes" "context" "encoding/json" "errors" "fmt" "time" "github.com/pborman/uuid" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc/yarpcerrors" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/client/wrappers/retryable" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig" ce "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" cndc "github.com/uber/cadence/common/ndc" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/reconciliation/invariant" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" hcommon "github.com/uber/cadence/service/history/common" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/decision" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/failover" "github.com/uber/cadence/service/history/ndc" "github.com/uber/cadence/service/history/query" "github.com/uber/cadence/service/history/queue" "github.com/uber/cadence/service/history/replication" "github.com/uber/cadence/service/history/reset" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" "github.com/uber/cadence/service/history/workflow" "github.com/uber/cadence/service/history/workflowcache" warchiver "github.com/uber/cadence/service/worker/archiver" ) const ( defaultQueryFirstDecisionTaskWaitTime = time.Second queryFirstDecisionTaskCheckInterval = 200 * time.Millisecond contextLockTimeout = 500 * time.Millisecond longPollCompletionBuffer = 50 * time.Millisecond // TerminateIfRunningReason reason for terminateIfRunning TerminateIfRunningReason = "TerminateIfRunning Policy" // TerminateIfRunningDetailsTemplate details template for terminateIfRunning TerminateIfRunningDetailsTemplate = "New runID: %s" ) var ( errDomainDeprecated = &types.BadRequestError{Message: "Domain is deprecated."} ) type ( historyEngineImpl struct { currentClusterName string shard shard.Context timeSource clock.TimeSource decisionHandler decision.Handler clusterMetadata cluster.Metadata historyV2Mgr persistence.HistoryManager executionManager persistence.ExecutionManager visibilityMgr persistence.VisibilityManager txProcessor queue.Processor timerProcessor queue.Processor crossClusterProcessor queue.Processor nDCReplicator ndc.HistoryReplicator nDCActivityReplicator ndc.ActivityReplicator historyEventNotifier events.Notifier tokenSerializer common.TaskTokenSerializer executionCache *execution.Cache metricsClient metrics.Client logger log.Logger throttledLogger log.Logger config *config.Config archivalClient warchiver.Client workflowResetter reset.WorkflowResetter queueTaskProcessor task.Processor crossClusterTaskProcessors common.Daemon replicationTaskProcessors []replication.TaskProcessor replicationAckManager replication.TaskAckManager replicationTaskStore *replication.TaskStore replicationHydrator replication.TaskHydrator replicationMetricsEmitter *replication.MetricsEmitterImpl publicClient workflowserviceclient.Interface eventsReapplier ndc.EventsReapplier matchingClient matching.Client rawMatchingClient matching.Client clientChecker client.VersionChecker replicationDLQHandler replication.DLQHandler failoverMarkerNotifier failover.MarkerNotifier wfIDCache workflowcache.WFCache ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter } ) var _ engine.Engine = (*historyEngineImpl)(nil) var ( // FailedWorkflowCloseState is a set of failed workflow close states, used for start workflow policy // for start workflow execution API FailedWorkflowCloseState = map[int]bool{ persistence.WorkflowCloseStatusFailed: true, persistence.WorkflowCloseStatusCanceled: true, persistence.WorkflowCloseStatusTerminated: true, persistence.WorkflowCloseStatusTimedOut: true, } ) // NewEngineWithShardContext creates an instance of history engine func NewEngineWithShardContext( shard shard.Context, visibilityMgr persistence.VisibilityManager, matching matching.Client, publicClient workflowserviceclient.Interface, historyEventNotifier events.Notifier, config *config.Config, crossClusterTaskFetchers task.Fetchers, replicationTaskFetchers replication.TaskFetchers, rawMatchingClient matching.Client, queueTaskProcessor task.Processor, failoverCoordinator failover.Coordinator, wfIDCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) engine.Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() logger := shard.GetLogger() executionManager := shard.GetExecutionManager() historyV2Manager := shard.GetHistoryManager() executionCache := execution.NewCache(shard) failoverMarkerNotifier := failover.NewMarkerNotifier(shard, config, failoverCoordinator) replicationHydrator := replication.NewDeferredTaskHydrator(shard.GetShardID(), historyV2Manager, executionCache, shard.GetDomainCache()) replicationTaskStore := replication.NewTaskStore( shard.GetConfig(), shard.GetClusterMetadata(), shard.GetDomainCache(), shard.GetMetricsClient(), shard.GetLogger(), replicationHydrator, ) replicationReader := replication.NewDynamicTaskReader(shard.GetShardID(), executionManager, shard.GetTimeSource(), config) historyEngImpl := &historyEngineImpl{ currentClusterName: currentClusterName, shard: shard, clusterMetadata: shard.GetClusterMetadata(), timeSource: shard.GetTimeSource(), historyV2Mgr: historyV2Manager, executionManager: executionManager, visibilityMgr: visibilityMgr, tokenSerializer: common.NewJSONTaskTokenSerializer(), executionCache: executionCache, logger: logger.WithTags(tag.ComponentHistoryEngine), throttledLogger: shard.GetThrottledLogger().WithTags(tag.ComponentHistoryEngine), metricsClient: shard.GetMetricsClient(), historyEventNotifier: historyEventNotifier, config: config, archivalClient: warchiver.NewClient( shard.GetMetricsClient(), logger, publicClient, shard.GetConfig().NumArchiveSystemWorkflows, quotas.NewDynamicRateLimiter(config.ArchiveRequestRPS.AsFloat64()), quotas.NewDynamicRateLimiter(func() float64 { return quotas.PerMember( service.History, float64(config.ArchiveInlineHistoryGlobalRPS()), float64(config.ArchiveInlineHistoryRPS()), shard.GetService().GetMembershipResolver(), ) }), quotas.NewDynamicRateLimiter(func() float64 { return quotas.PerMember( service.History, float64(config.ArchiveInlineVisibilityGlobalRPS()), float64(config.ArchiveInlineVisibilityRPS()), shard.GetService().GetMembershipResolver(), ) }), shard.GetService().GetArchiverProvider(), config.AllowArchivingIncompleteHistory, ), workflowResetter: reset.NewWorkflowResetter( shard, executionCache, logger, ), publicClient: publicClient, matchingClient: matching, rawMatchingClient: rawMatchingClient, queueTaskProcessor: queueTaskProcessor, clientChecker: client.NewVersionChecker(), failoverMarkerNotifier: failoverMarkerNotifier, replicationHydrator: replicationHydrator, replicationAckManager: replication.NewTaskAckManager( shard.GetShardID(), shard, shard.GetMetricsClient(), shard.GetLogger(), replicationReader, replicationTaskStore, ), replicationTaskStore: replicationTaskStore, replicationMetricsEmitter: replication.NewMetricsEmitter( shard.GetShardID(), shard, replicationReader, shard.GetMetricsClient()), wfIDCache: wfIDCache, ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID, } historyEngImpl.decisionHandler = decision.NewHandler( shard, historyEngImpl.executionCache, historyEngImpl.tokenSerializer, ) pRetry := persistence.NewPersistenceRetryer( shard.GetExecutionManager(), shard.GetHistoryManager(), common.CreatePersistenceRetryPolicy(), ) openExecutionCheck := invariant.NewConcreteExecutionExists(pRetry, shard.GetDomainCache()) historyEngImpl.txProcessor = queue.NewTransferQueueProcessor( shard, historyEngImpl, queueTaskProcessor, executionCache, historyEngImpl.workflowResetter, historyEngImpl.archivalClient, openExecutionCheck, historyEngImpl.wfIDCache, historyEngImpl.ratelimitInternalPerWorkflowID, ) historyEngImpl.timerProcessor = queue.NewTimerQueueProcessor( shard, historyEngImpl, queueTaskProcessor, executionCache, historyEngImpl.archivalClient, openExecutionCheck, ) historyEngImpl.crossClusterProcessor = queue.NewCrossClusterQueueProcessor( shard, historyEngImpl, executionCache, queueTaskProcessor, ) historyEngImpl.eventsReapplier = ndc.NewEventsReapplier(shard.GetMetricsClient(), logger) historyEngImpl.nDCReplicator = ndc.NewHistoryReplicator( shard, executionCache, historyEngImpl.eventsReapplier, logger, ) historyEngImpl.nDCActivityReplicator = ndc.NewActivityReplicator( shard, executionCache, logger, ) historyEngImpl.crossClusterTaskProcessors = task.NewCrossClusterTaskProcessors( shard, queueTaskProcessor, crossClusterTaskFetchers, &task.CrossClusterTaskProcessorOptions{ Enabled: config.EnableCrossClusterEngine, MaxPendingTasks: config.CrossClusterTargetProcessorMaxPendingTasks, TaskMaxRetryCount: config.CrossClusterTargetProcessorMaxRetryCount, TaskRedispatchInterval: config.ActiveTaskRedispatchInterval, TaskWaitInterval: config.CrossClusterTargetProcessorTaskWaitInterval, ServiceBusyBackoffInterval: config.CrossClusterTargetProcessorServiceBusyBackoffInterval, TimerJitterCoefficient: config.CrossClusterTargetProcessorJitterCoefficient, }, ) var replicationTaskProcessors []replication.TaskProcessor replicationTaskExecutors := make(map[string]replication.TaskExecutor) // Intentionally use the raw client to create its own retry policy historyRawClient := shard.GetService().GetClientBean().GetHistoryClient() historyRetryableClient := retryable.NewHistoryClient( historyRawClient, common.CreateReplicationServiceBusyRetryPolicy(), common.IsServiceBusyError, ) resendFunc := func(ctx context.Context, request *types.ReplicateEventsV2Request) error { return historyRetryableClient.ReplicateEventsV2(ctx, request) } for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() { sourceCluster := replicationTaskFetcher.GetSourceCluster() // Intentionally use the raw client to create its own retry policy adminClient := shard.GetService().GetClientBean().GetRemoteAdminClient(sourceCluster) adminRetryableClient := retryable.NewAdminClient( adminClient, common.CreateReplicationServiceBusyRetryPolicy(), common.IsServiceBusyError, ) historyResender := cndc.NewHistoryResender( shard.GetDomainCache(), adminRetryableClient, resendFunc, nil, openExecutionCheck, shard.GetLogger(), ) replicationTaskExecutor := replication.NewTaskExecutor( shard, shard.GetDomainCache(), historyResender, historyEngImpl, shard.GetMetricsClient(), shard.GetLogger(), ) replicationTaskExecutors[sourceCluster] = replicationTaskExecutor replicationTaskProcessor := replication.NewTaskProcessor( shard, historyEngImpl, config, shard.GetMetricsClient(), replicationTaskFetcher, replicationTaskExecutor, ) replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor) } historyEngImpl.replicationTaskProcessors = replicationTaskProcessors replicationMessageHandler := replication.NewDLQHandler(shard, replicationTaskExecutors) historyEngImpl.replicationDLQHandler = replicationMessageHandler shard.SetEngine(historyEngImpl) return historyEngImpl } // Start will spin up all the components needed to start serving this shard. // Make sure all the components are loaded lazily so start can return immediately. This is important because // ShardController calls start sequentially for all the shards for a given host during startup. func (e *historyEngineImpl) Start() { e.logger.Info("History engine state changed", tag.LifeCycleStarting) defer e.logger.Info("History engine state changed", tag.LifeCycleStarted) e.txProcessor.Start() e.timerProcessor.Start() e.crossClusterProcessor.Start() e.replicationDLQHandler.Start() e.replicationMetricsEmitter.Start() // failover callback will try to create a failover queue processor to scan all inflight tasks // if domain needs to be failovered. However, in the multicursor queue logic, the scan range // can't be retrieved before the processor is started. If failover callback is registered // before queue processor is started, it may result in a deadline as to create the failover queue, // queue processor need to be started. e.registerDomainFailoverCallback() e.crossClusterTaskProcessors.Start() for _, replicationTaskProcessor := range e.replicationTaskProcessors { replicationTaskProcessor.Start() } if e.config.EnableGracefulFailover() { e.failoverMarkerNotifier.Start() } } // Stop the service. func (e *historyEngineImpl) Stop() { e.logger.Info("History engine state changed", tag.LifeCycleStopping) defer e.logger.Info("History engine state changed", tag.LifeCycleStopped) e.txProcessor.Stop() e.timerProcessor.Stop() e.crossClusterProcessor.Stop() e.replicationDLQHandler.Stop() e.replicationMetricsEmitter.Stop() e.crossClusterTaskProcessors.Stop() for _, replicationTaskProcessor := range e.replicationTaskProcessors { replicationTaskProcessor.Stop() } if e.queueTaskProcessor != nil { e.queueTaskProcessor.StopShardProcessor(e.shard) } e.failoverMarkerNotifier.Stop() // unset the failover callback e.shard.GetDomainCache().UnregisterDomainChangeCallback(e.shard.GetShardID()) } func (e *historyEngineImpl) registerDomainFailoverCallback() { // NOTE: READ BEFORE MODIFICATION // // Tasks, e.g. transfer tasks and timer tasks, are created when holding the shard lock // meaning tasks -> release of shard lock // // Domain change notification follows the following steps, order matters // 1. lock all task processing. // 2. domain changes visible to everyone (Note: lock of task processing prevents task processing logic seeing the domain changes). // 3. failover min and max task levels are calculated, then update to shard. // 4. failover start & task processing unlock & shard domain version notification update. (order does not matter for this discussion) // // The above guarantees that task created during the failover will be processed. // If the task is created after domain change: // then active processor will handle it. (simple case) // If the task is created before domain change: // task -> release of shard lock // failover min / max task levels calculated & updated to shard (using shard lock) -> failover start // above 2 guarantees that failover start is after persistence of the task. failoverPredicate := func(shardNotificationVersion int64, nextDomain *cache.DomainCacheEntry, action func()) { domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion() domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName if nextDomain.IsGlobalDomain() && domainFailoverNotificationVersion >= shardNotificationVersion && domainActiveCluster == e.currentClusterName { action() } } // first set the failover callback e.shard.GetDomainCache().RegisterDomainChangeCallback( e.shard.GetShardID(), e.shard.GetDomainNotificationVersion(), func() { e.txProcessor.LockTaskProcessing() e.timerProcessor.LockTaskProcessing() // there no lock/unlock for crossClusterProcessor }, func(nextDomains []*cache.DomainCacheEntry) { defer func() { e.txProcessor.UnlockTaskProcessing() e.timerProcessor.UnlockTaskProcessing() // there no lock/unlock for crossClusterProcessor }() if len(nextDomains) == 0 { return } shardNotificationVersion := e.shard.GetDomainNotificationVersion() failoverDomainIDs := map[string]struct{}{} for _, nextDomain := range nextDomains { failoverPredicate(shardNotificationVersion, nextDomain, func() { failoverDomainIDs[nextDomain.GetInfo().ID] = struct{}{} }) } if len(failoverDomainIDs) > 0 { e.logger.Info("Domain Failover Start.", tag.WorkflowDomainIDs(failoverDomainIDs)) e.txProcessor.FailoverDomain(failoverDomainIDs) e.timerProcessor.FailoverDomain(failoverDomainIDs) e.crossClusterProcessor.FailoverDomain(failoverDomainIDs) now := e.shard.GetTimeSource().Now() // the fake tasks will not be actually used, we just need to make sure // its length > 0 and has correct timestamp, to trigger a db scan fakeDecisionTask := []persistence.Task{&persistence.DecisionTask{}} fakeDecisionTimeoutTask := []persistence.Task{&persistence.DecisionTimeoutTask{TaskData: persistence.TaskData{VisibilityTimestamp: now}}} e.txProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTask}) e.timerProcessor.NotifyNewTask(e.currentClusterName, &hcommon.NotifyTaskInfo{Tasks: fakeDecisionTimeoutTask}) } // handle graceful failover on active to passive // make sure task processor failover the domain before inserting the failover marker failoverMarkerTasks := []*persistence.FailoverMarkerTask{} for _, nextDomain := range nextDomains { domainFailoverNotificationVersion := nextDomain.GetFailoverNotificationVersion() domainActiveCluster := nextDomain.GetReplicationConfig().ActiveClusterName previousFailoverVersion := nextDomain.GetPreviousFailoverVersion() previousClusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(previousFailoverVersion) if err != nil { e.logger.Error("Failed to handle graceful failover", tag.WorkflowDomainID(nextDomain.GetInfo().ID), tag.Error(err)) continue } if nextDomain.IsGlobalDomain() && domainFailoverNotificationVersion >= shardNotificationVersion && domainActiveCluster != e.currentClusterName && previousFailoverVersion != common.InitialPreviousFailoverVersion && previousClusterName == e.currentClusterName { // the visibility timestamp will be set in shard context failoverMarkerTasks = append(failoverMarkerTasks, &persistence.FailoverMarkerTask{ TaskData: persistence.TaskData{ Version: nextDomain.GetFailoverVersion(), }, DomainID: nextDomain.GetInfo().ID, }) // This is a debug metric e.metricsClient.IncCounter(metrics.FailoverMarkerScope, metrics.FailoverMarkerCallbackCount) } } // This is a debug metric e.metricsClient.IncCounter(metrics.FailoverMarkerScope, metrics.HistoryFailoverCallbackCount) if len(failoverMarkerTasks) > 0 { if err := e.shard.ReplicateFailoverMarkers( context.Background(), failoverMarkerTasks, ); err != nil { e.logger.Error("Failed to insert failover marker to replication queue.", tag.Error(err)) e.metricsClient.IncCounter(metrics.FailoverMarkerScope, metrics.FailoverMarkerInsertFailure) // fail this failover callback and it retries on next domain cache refresh return } } //nolint:errcheck e.shard.UpdateDomainNotificationVersion(nextDomains[len(nextDomains)-1].GetNotificationVersion() + 1) }, ) } func (e *historyEngineImpl) createMutableState( domainEntry *cache.DomainCacheEntry, runID string, ) (execution.MutableState, error) { newMutableState := execution.NewMutableStateBuilderWithVersionHistories( e.shard, e.logger, domainEntry, ) if err := newMutableState.SetHistoryTree(runID); err != nil { return nil, err } return newMutableState, nil } func (e *historyEngineImpl) generateFirstDecisionTask( mutableState execution.MutableState, parentInfo *types.ParentExecutionInfo, startEvent *types.HistoryEvent, ) error { if parentInfo == nil { // DecisionTask is only created when it is not a Child Workflow and no backoff is needed if err := mutableState.AddFirstDecisionTaskScheduled( startEvent, ); err != nil { return err } } return nil } // StartWorkflowExecution starts a workflow execution func (e *historyEngineImpl) StartWorkflowExecution( ctx context.Context, startRequest *types.HistoryStartWorkflowExecutionRequest, ) (resp *types.StartWorkflowExecutionResponse, retError error) { domainEntry, err := e.getActiveDomainByID(startRequest.DomainUUID) if err != nil { return nil, err } return e.startWorkflowHelper( ctx, startRequest, domainEntry, metrics.HistoryStartWorkflowExecutionScope, nil) } // for startWorkflowHelper be reused by signalWithStart type signalWithStartArg struct { signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest prevMutableState execution.MutableState } func (e *historyEngineImpl) newDomainNotActiveError( domainName string, failoverVersion int64, ) error { clusterMetadata := e.shard.GetService().GetClusterMetadata() clusterName, err := clusterMetadata.ClusterNameForFailoverVersion(failoverVersion) if err != nil { clusterName = "_unknown_" } return ce.NewDomainNotActiveError( domainName, clusterMetadata.GetCurrentClusterName(), clusterName, ) } func (e *historyEngineImpl) startWorkflowHelper( ctx context.Context, startRequest *types.HistoryStartWorkflowExecutionRequest, domainEntry *cache.DomainCacheEntry, metricsScope int, signalWithStartArg *signalWithStartArg, ) (resp *types.StartWorkflowExecutionResponse, retError error) { if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered { return nil, errDomainDeprecated } request := startRequest.StartRequest err := e.validateStartWorkflowExecutionRequest(request, metricsScope) if err != nil { return nil, err } e.overrideStartWorkflowExecutionRequest(domainEntry, request, metricsScope) workflowID := request.GetWorkflowID() domainID := domainEntry.GetInfo().ID domain := domainEntry.GetInfo().Name // grab the current context as a lock, nothing more // use a smaller context timeout to get the lock childCtx, childCancel := e.newChildContext(ctx) defer childCancel() _, currentRelease, err := e.executionCache.GetOrCreateCurrentWorkflowExecution( childCtx, domainID, workflowID, ) if err != nil { if err == context.DeadlineExceeded { return nil, workflow.ErrConcurrentStartRequest } return nil, err } defer func() { currentRelease(retError) }() workflowExecution := types.WorkflowExecution{ WorkflowID: workflowID, RunID: uuid.New(), } curMutableState, err := e.createMutableState(domainEntry, workflowExecution.GetRunID()) if err != nil { return nil, err } // preprocess for signalWithStart var prevMutableState execution.MutableState var signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest isSignalWithStart := signalWithStartArg != nil if isSignalWithStart { prevMutableState = signalWithStartArg.prevMutableState signalWithStartRequest = signalWithStartArg.signalWithStartRequest } if prevMutableState != nil { prevLastWriteVersion, err := prevMutableState.GetLastWriteVersion() if err != nil { return nil, err } if prevLastWriteVersion > curMutableState.GetCurrentVersion() { return nil, e.newDomainNotActiveError( domainEntry.GetInfo().Name, prevLastWriteVersion, ) } err = e.applyWorkflowIDReusePolicyForSigWithStart( prevMutableState.GetExecutionInfo(), workflowExecution, request.GetWorkflowIDReusePolicy(), ) if err != nil { return nil, err } } else if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil { uninitializedRequest := &persistence.RecordWorkflowExecutionUninitializedRequest{ DomainUUID: domainID, Domain: domain, Execution: types.WorkflowExecution{ WorkflowID: workflowID, RunID: workflowExecution.RunID, }, WorkflowTypeName: request.WorkflowType.Name, UpdateTimestamp: e.shard.GetTimeSource().Now().UnixNano(), ShardID: int64(e.shard.GetShardID()), } if err := e.visibilityMgr.RecordWorkflowExecutionUninitialized(ctx, uninitializedRequest); err != nil { e.logger.Error("Failed to record uninitialized workflow execution", tag.Error(err)) } } err = e.addStartEventsAndTasks( curMutableState, workflowExecution, startRequest, signalWithStartRequest, ) if err != nil { if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil { // delete the uninitialized workflow execution record since it failed to start the workflow // uninitialized record is used to find wfs that didn't make a progress or stuck during the start process if errVisibility := e.visibilityMgr.DeleteWorkflowExecution(ctx, &persistence.VisibilityDeleteWorkflowExecutionRequest{ DomainID: domainID, Domain: domain, RunID: workflowExecution.RunID, WorkflowID: workflowID, }); errVisibility != nil { e.logger.Error("Failed to delete uninitialized workflow execution record", tag.Error(errVisibility)) } } return nil, err } wfContext := execution.NewContext(domainID, workflowExecution, e.shard, e.executionManager, e.logger) newWorkflow, newWorkflowEventsSeq, err := curMutableState.CloseTransactionAsSnapshot( e.timeSource.Now(), execution.TransactionPolicyActive, ) if err != nil { return nil, err } historyBlob, err := wfContext.PersistStartWorkflowBatchEvents(ctx, newWorkflowEventsSeq[0]) if err != nil { return nil, err } // create as brand new createMode := persistence.CreateWorkflowModeBrandNew prevRunID := "" prevLastWriteVersion := int64(0) // overwrite in case of signalWithStart if prevMutableState != nil { createMode = persistence.CreateWorkflowModeWorkflowIDReuse info := prevMutableState.GetExecutionInfo() // For corrupted workflows use ContinueAsNew mode. // WorkflowIDReuse mode require workflows to be in completed state, which is not necessarily true for corrupted workflows. if info.State == persistence.WorkflowStateCorrupted { createMode = persistence.CreateWorkflowModeContinueAsNew } prevRunID = info.RunID prevLastWriteVersion, err = prevMutableState.GetLastWriteVersion() if err != nil { return nil, err } } err = wfContext.CreateWorkflowExecution( ctx, newWorkflow, historyBlob, createMode, prevRunID, prevLastWriteVersion, ) // handle already started error if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok { if t.StartRequestID == request.GetRequestID() { return &types.StartWorkflowExecutionResponse{ RunID: t.RunID, }, nil } if isSignalWithStart { return nil, err } if curMutableState.GetCurrentVersion() < t.LastWriteVersion { return nil, e.newDomainNotActiveError( domainEntry.GetInfo().Name, t.LastWriteVersion, ) } prevRunID = t.RunID if shouldTerminateAndStart(startRequest, t.State) { runningWFCtx, err := workflow.LoadOnce(ctx, e.executionCache, domainID, workflowID, prevRunID) if err != nil { return nil, err } defer func() { runningWFCtx.GetReleaseFn()(retError) }() resp, err = e.terminateAndStartWorkflow( ctx, runningWFCtx, workflowExecution, domainEntry, domainID, startRequest, nil, ) switch err.(type) { // By the time we try to terminate the workflow, it was already terminated // So continue as if we didn't need to terminate it in the first place case *types.WorkflowExecutionAlreadyCompletedError: e.shard.GetLogger().Warn("Workflow completed while trying to terminate, will continue starting workflow", tag.Error(err)) default: return resp, err } } if err = e.applyWorkflowIDReusePolicyHelper( t.StartRequestID, prevRunID, t.State, t.CloseStatus, workflowExecution, startRequest.StartRequest.GetWorkflowIDReusePolicy(), ); err != nil { return nil, err } // create as ID reuse createMode = persistence.CreateWorkflowModeWorkflowIDReuse err = wfContext.CreateWorkflowExecution( ctx, newWorkflow, historyBlob, createMode, prevRunID, t.LastWriteVersion, ) } if err != nil { return nil, err } return &types.StartWorkflowExecutionResponse{ RunID: workflowExecution.RunID, }, nil } func shouldTerminateAndStart( startRequest *types.HistoryStartWorkflowExecutionRequest, state int, ) bool { return startRequest.StartRequest.GetWorkflowIDReusePolicy() == types.WorkflowIDReusePolicyTerminateIfRunning && (state == persistence.WorkflowStateRunning || state == persistence.WorkflowStateCreated) } // terminate running workflow then start a new run in one transaction func (e *historyEngineImpl) terminateAndStartWorkflow( ctx context.Context, runningWFCtx workflow.Context, workflowExecution types.WorkflowExecution, domainEntry *cache.DomainCacheEntry, domainID string, startRequest *types.HistoryStartWorkflowExecutionRequest, signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest, ) (*types.StartWorkflowExecutionResponse, error) { runningMutableState := runningWFCtx.GetMutableState() UpdateWorkflowLoop: for attempt := 0; attempt < workflow.ConditionalRetryCount; attempt++ { if !runningMutableState.IsWorkflowExecutionRunning() { return nil, workflow.ErrAlreadyCompleted } if err := execution.TerminateWorkflow( runningMutableState, runningMutableState.GetNextEventID(), TerminateIfRunningReason, getTerminateIfRunningDetails(workflowExecution.GetRunID()), execution.IdentityHistoryService, ); err != nil { if err == workflow.ErrStaleState { // Handler detected that cached workflow mutable could potentially be stale // Reload workflow execution history runningWFCtx.GetContext().Clear() if attempt != workflow.ConditionalRetryCount-1 { _, err = runningWFCtx.ReloadMutableState(ctx) if err != nil { return nil, err } } continue UpdateWorkflowLoop } return nil, err } // new mutable state newMutableState, err := e.createMutableState(domainEntry, workflowExecution.GetRunID()) if err != nil { return nil, err } if signalWithStartRequest != nil { startRequest, err = getStartRequest(domainID, signalWithStartRequest.SignalWithStartRequest, signalWithStartRequest.PartitionConfig) if err != nil { return nil, err } } err = e.addStartEventsAndTasks( newMutableState, workflowExecution, startRequest, signalWithStartRequest, ) if err != nil { return nil, err } updateErr := runningWFCtx.GetContext().UpdateWorkflowExecutionWithNewAsActive( ctx, e.timeSource.Now(), execution.NewContext( domainID, workflowExecution, e.shard, e.shard.GetExecutionManager(), e.logger, ), newMutableState, ) if updateErr != nil { if execution.IsConflictError(updateErr) { e.metricsClient.IncCounter(metrics.HistoryStartWorkflowExecutionScope, metrics.ConcurrencyUpdateFailureCounter) continue UpdateWorkflowLoop } return nil, updateErr } break UpdateWorkflowLoop } return &types.StartWorkflowExecutionResponse{ RunID: workflowExecution.RunID, }, nil } func (e *historyEngineImpl) addStartEventsAndTasks( mutableState execution.MutableState, workflowExecution types.WorkflowExecution, startRequest *types.HistoryStartWorkflowExecutionRequest, signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest, ) error { // Add WF start event startEvent, err := mutableState.AddWorkflowExecutionStartedEvent( workflowExecution, startRequest, ) if err != nil { return &types.InternalServiceError{ Message: "Failed to add workflow execution started event.", } } if signalWithStartRequest != nil { // Add signal event sRequest := signalWithStartRequest.SignalWithStartRequest if sRequest.GetRequestID() != "" { mutableState.AddSignalRequested(sRequest.GetRequestID()) } _, err := mutableState.AddWorkflowExecutionSignaled( sRequest.GetSignalName(), sRequest.GetSignalInput(), sRequest.GetIdentity(), sRequest.GetRequestID(), ) if err != nil { return &types.InternalServiceError{Message: "Failed to add workflow execution signaled event."} } } // Generate first decision task event if not child WF and no first decision task backoff return e.generateFirstDecisionTask( mutableState, startRequest.ParentExecutionInfo, startEvent, ) } func getTerminateIfRunningDetails(newRunID string) []byte { return []byte(fmt.Sprintf(TerminateIfRunningDetailsTemplate, newRunID)) } // GetMutableState retrieves the mutable state of the workflow execution func (e *historyEngineImpl) GetMutableState( ctx context.Context, request *types.GetMutableStateRequest, ) (*types.GetMutableStateResponse, error) { return e.getMutableStateOrPolling(ctx, request) } // PollMutableState retrieves the mutable state of the workflow execution with long polling func (e *historyEngineImpl) PollMutableState( ctx context.Context, request *types.PollMutableStateRequest, ) (*types.PollMutableStateResponse, error) { response, err := e.getMutableStateOrPolling(ctx, &types.GetMutableStateRequest{ DomainUUID: request.DomainUUID, Execution: request.Execution, ExpectedNextEventID: request.ExpectedNextEventID, CurrentBranchToken: request.CurrentBranchToken}) if err != nil { return nil, e.updateEntityNotExistsErrorOnPassiveCluster(err, request.GetDomainUUID()) } return &types.PollMutableStateResponse{ Execution: response.Execution, WorkflowType: response.WorkflowType, NextEventID: response.NextEventID, PreviousStartedEventID: response.PreviousStartedEventID, LastFirstEventID: response.LastFirstEventID, TaskList: response.TaskList, StickyTaskList: response.StickyTaskList, ClientLibraryVersion: response.ClientLibraryVersion, ClientFeatureVersion: response.ClientFeatureVersion, ClientImpl: response.ClientImpl, StickyTaskListScheduleToStartTimeout: response.StickyTaskListScheduleToStartTimeout, CurrentBranchToken: response.CurrentBranchToken, VersionHistories: response.VersionHistories, WorkflowState: response.WorkflowState, WorkflowCloseState: response.WorkflowCloseState, }, nil } func (e *historyEngineImpl) updateEntityNotExistsErrorOnPassiveCluster(err error, domainID string) error { switch err.(type) { case *types.EntityNotExistsError: domainEntry, domainCacheErr := e.shard.GetDomainCache().GetDomainByID(domainID) if domainCacheErr != nil { return err // if could not access domain cache simply return original error } if _, domainNotActiveErr := domainEntry.IsActiveIn(e.clusterMetadata.GetCurrentClusterName()); domainNotActiveErr != nil { domainNotActiveErrCasted := domainNotActiveErr.(*types.DomainNotActiveError) return &types.EntityNotExistsError{ Message: "Workflow execution not found in non-active cluster", ActiveCluster: domainNotActiveErrCasted.GetActiveCluster(), CurrentCluster: domainNotActiveErrCasted.GetCurrentCluster(), } } } return err } func (e *historyEngineImpl) getMutableStateOrPolling( ctx context.Context, request *types.GetMutableStateRequest, ) (*types.GetMutableStateResponse, error) { if err := common.ValidateDomainUUID(request.DomainUUID); err != nil { return nil, err } domainID := request.DomainUUID execution := types.WorkflowExecution{ WorkflowID: request.Execution.WorkflowID, RunID: request.Execution.RunID, } response, err := e.getMutableState(ctx, domainID, execution) if err != nil { return nil, err } if request.CurrentBranchToken == nil { request.CurrentBranchToken = response.CurrentBranchToken } if !bytes.Equal(request.CurrentBranchToken, response.CurrentBranchToken) { return nil, &types.CurrentBranchChangedError{ Message: "current branch token and request branch token doesn't match", CurrentBranchToken: response.CurrentBranchToken} } // set the run id in case query the current running workflow execution.RunID = response.Execution.RunID // expectedNextEventID is 0 when caller want to get the current next event ID without blocking expectedNextEventID := common.FirstEventID if request.ExpectedNextEventID != 0 { expectedNextEventID = request.GetExpectedNextEventID() } // if caller decide to long poll on workflow execution // and the event ID we are looking for is smaller than current next event ID if expectedNextEventID >= response.GetNextEventID() && response.GetIsWorkflowRunning() { subscriberID, channel, err := e.historyEventNotifier.WatchHistoryEvent(definition.NewWorkflowIdentifier(domainID, execution.GetWorkflowID(), execution.GetRunID())) if err != nil { return nil, err } defer e.historyEventNotifier.UnwatchHistoryEvent(definition.NewWorkflowIdentifier(domainID, execution.GetWorkflowID(), execution.GetRunID()), subscriberID) //nolint:errcheck // check again in case the next event ID is updated response, err = e.getMutableState(ctx, domainID, execution) if err != nil { return nil, err } // check again if the current branch token changed if !bytes.Equal(request.CurrentBranchToken, response.CurrentBranchToken) { return nil, &types.CurrentBranchChangedError{ Message: "current branch token and request branch token doesn't match", CurrentBranchToken: response.CurrentBranchToken} } if expectedNextEventID < response.GetNextEventID() || !response.GetIsWorkflowRunning() { return response, nil } domainName, err := e.shard.GetDomainCache().GetDomainName(domainID) if err != nil { return nil, err } expirationInterval := e.shard.GetConfig().LongPollExpirationInterval(domainName) if deadline, ok := ctx.Deadline(); ok { remainingTime := deadline.Sub(e.shard.GetTimeSource().Now()) // Here we return a safeguard error, to ensure that older clients are not stuck in long poll loop until context fully expires. // Otherwise it results in multiple additional requests being made that returns empty responses. // Newer clients will not make request with too small timeout remaining. if remainingTime < longPollCompletionBuffer { return nil, context.DeadlineExceeded } // longPollCompletionBuffer is here to leave some room to finish current request without its timeout. expirationInterval = common.MinDuration( expirationInterval, remainingTime-longPollCompletionBuffer, ) } if expirationInterval <= 0 { return response, nil } timer := time.NewTimer(expirationInterval) defer timer.Stop() for { select { case event := <-channel: response.LastFirstEventID = event.LastFirstEventID response.NextEventID = event.NextEventID response.IsWorkflowRunning = event.WorkflowCloseState == persistence.WorkflowCloseStatusNone response.PreviousStartedEventID = common.Int64Ptr(event.PreviousStartedEventID) response.WorkflowState = common.Int32Ptr(int32(event.WorkflowState)) response.WorkflowCloseState = common.Int32Ptr(int32(event.WorkflowCloseState)) if !bytes.Equal(request.CurrentBranchToken, event.CurrentBranchToken) { return nil, &types.CurrentBranchChangedError{ Message: "Current branch token and request branch token doesn't match", CurrentBranchToken: event.CurrentBranchToken} } if expectedNextEventID < response.GetNextEventID() || !response.GetIsWorkflowRunning() { return response, nil } case <-timer.C: return response, nil } } } return response, nil } func (e *historyEngineImpl) QueryWorkflow( ctx context.Context, request *types.HistoryQueryWorkflowRequest, ) (retResp *types.HistoryQueryWorkflowResponse, retErr error) { scope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope).Tagged(metrics.DomainTag(request.GetRequest().GetDomain())) shardMetricScope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope, metrics.ShardIDTag(e.shard.GetShardID())) consistentQueryEnabled := e.config.EnableConsistentQuery() && e.config.EnableConsistentQueryByDomain(request.GetRequest().GetDomain()) if request.GetRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong { if !consistentQueryEnabled { return nil, workflow.ErrConsistentQueryNotEnabled } shardMetricScope.IncCounter(metrics.ConsistentQueryPerShard) e.logger.SampleInfo("History QueryWorkflow called with QueryConsistencyLevelStrong", e.config.SampleLoggingRate(), tag.ShardID(e.shard.GetShardID()), tag.WorkflowID(request.GetRequest().Execution.WorkflowID), tag.WorkflowDomainName(request.GetRequest().Domain)) } execution := *request.GetRequest().GetExecution() mutableStateResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err } req := request.GetRequest() if !mutableStateResp.GetIsWorkflowRunning() && req.QueryRejectCondition != nil { notOpenReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotOpen closeStatus := mutableStateResp.GetWorkflowCloseState() notCompletedCleanlyReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotCompletedCleanly && closeStatus != persistence.WorkflowCloseStatusCompleted if notOpenReject || notCompletedCleanlyReject { return &types.HistoryQueryWorkflowResponse{ Response: &types.QueryWorkflowResponse{ QueryRejected: &types.QueryRejected{ CloseStatus: persistence.ToInternalWorkflowExecutionCloseStatus(int(closeStatus)), }, }, }, nil } } // query cannot be processed unless at least one decision task has finished // if first decision task has not finished wait for up to a second for it to complete queryFirstDecisionTaskWaitTime := defaultQueryFirstDecisionTaskWaitTime ctxDeadline, ok := ctx.Deadline() if ok { ctxWaitTime := time.Until(ctxDeadline) - time.Second if ctxWaitTime > queryFirstDecisionTaskWaitTime { queryFirstDecisionTaskWaitTime = ctxWaitTime } } deadline := time.Now().Add(queryFirstDecisionTaskWaitTime) for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) { <-time.After(queryFirstDecisionTaskCheckInterval) mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err } } if mutableStateResp.GetPreviousStartedEventID() <= 0 { scope.IncCounter(metrics.QueryBeforeFirstDecisionCount) return nil, workflow.ErrQueryWorkflowBeforeFirstDecision } de, err := e.shard.GetDomainCache().GetDomainByID(request.GetDomainUUID()) if err != nil { return nil, err } wfContext, release, err := e.executionCache.GetOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err } defer func() { release(retErr) }() mutableState, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { return nil, err } // If history is corrupted, query will be rejected if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil { return nil, err } else if corrupted { return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."} } // There are two ways in which queries get dispatched to decider. First, queries can be dispatched on decision tasks. // These decision tasks potentially contain new events and queries. The events are treated as coming before the query in time. // The second way in which queries are dispatched to decider is directly through matching; in this approach queries can be // dispatched to decider immediately even if there are outstanding events that came before the query. The following logic // is used to determine if a query can be safely dispatched directly through matching or if given the desired consistency // level must be dispatched on a decision task. There are four cases in which a query can be dispatched directly through // matching safely, without violating the desired consistency level: // 1. the domain is not active, in this case history is immutable so a query dispatched at any time is consistent // 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent // 3. the client requested eventual consistency, in this case there are no consistency requirements so dispatching directly through matching is safe // 4. if there is no pending or started decision it means no events came before query arrived, so its safe to dispatch directly isActive, _ := de.IsActiveIn(e.clusterMetadata.GetCurrentClusterName()) safeToDispatchDirectly := !isActive || !mutableState.IsWorkflowExecutionRunning() || req.GetQueryConsistencyLevel() == types.QueryConsistencyLevelEventual || (!mutableState.HasPendingDecision() && !mutableState.HasInFlightDecision()) if safeToDispatchDirectly { release(nil) msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err } req.Execution.RunID = msResp.Execution.RunID return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope) } // If we get here it means query could not be dispatched through matching directly, so it must block // until either an result has been obtained on a decision task response or until it is safe to dispatch directly through matching. sw := scope.StartTimer(metrics.DecisionTaskQueryLatency) defer sw.Stop() queryReg := mutableState.GetQueryRegistry() if len(queryReg.GetBufferedIDs()) >= e.config.MaxBufferedQueryCount() { scope.IncCounter(metrics.QueryBufferExceededCount) return nil, workflow.ErrConsistentQueryBufferExceeded } queryID, termCh := queryReg.BufferQuery(req.GetQuery()) defer queryReg.RemoveQuery(queryID) release(nil) select { case <-termCh: state, err := queryReg.GetTerminationState(queryID) if err != nil { scope.IncCounter(metrics.QueryRegistryInvalidStateCount) return nil, err } switch state.TerminationType { case query.TerminationTypeCompleted: result := state.QueryResult switch result.GetResultType() { case types.QueryResultTypeAnswered: return &types.HistoryQueryWorkflowResponse{ Response: &types.QueryWorkflowResponse{ QueryResult: result.GetAnswer(), }, }, nil case types.QueryResultTypeFailed: return nil, &types.QueryFailedError{Message: result.GetErrorMessage()} default: scope.IncCounter(metrics.QueryRegistryInvalidStateCount) return nil, workflow.ErrQueryEnteredInvalidState } case query.TerminationTypeUnblocked: msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err } req.Execution.RunID = msResp.Execution.RunID return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope) case query.TerminationTypeFailed: return nil, state.Failure default: scope.IncCounter(metrics.QueryRegistryInvalidStateCount) return nil, workflow.ErrQueryEnteredInvalidState } case <-ctx.Done(): scope.IncCounter(metrics.ConsistentQueryTimeoutCount) return nil, ctx.Err() } } func (e *historyEngineImpl) queryDirectlyThroughMatching( ctx context.Context, msResp *types.GetMutableStateResponse, domainID string, queryRequest *types.QueryWorkflowRequest, scope metrics.Scope, ) (*types.HistoryQueryWorkflowResponse, error) { sw := scope.StartTimer(metrics.DirectQueryDispatchLatency) defer sw.Stop() // Sticky task list is not very useful in the standby cluster because the decider cache is // not updated by dispatching tasks to it (it is only updated in the case of query). // Additionally on the standby side we are not even able to clear sticky. // Stickiness might be outdated if the customer did a restart of their nodes causing a query // dispatched on the standby side on sticky to hang. We decided it made sense to simply not attempt // query on sticky task list at all on the passive side. de, err := e.shard.GetDomainCache().GetDomainByID(domainID) if err != nil { return nil, err } supportsStickyQuery := e.clientChecker.SupportsStickyQuery(msResp.GetClientImpl(), msResp.GetClientFeatureVersion()) == nil domainIsActive, _ := de.IsActiveIn(e.clusterMetadata.GetCurrentClusterName()) if msResp.GetIsStickyTaskListEnabled() && len(msResp.GetStickyTaskList().GetName()) != 0 && supportsStickyQuery && e.config.EnableStickyQuery(queryRequest.GetDomain()) && domainIsActive { stickyMatchingRequest := &types.MatchingQueryWorkflowRequest{ DomainUUID: domainID, QueryRequest: queryRequest, TaskList: msResp.GetStickyTaskList(), } // using a clean new context in case customer provide a context which has // a really short deadline, causing we clear the stickiness stickyContext, cancel := context.WithTimeout(context.Background(), time.Duration(msResp.GetStickyTaskListScheduleToStartTimeout())*time.Second) stickyStopWatch := scope.StartTimer(metrics.DirectQueryDispatchStickyLatency) matchingResp, err := e.rawMatchingClient.QueryWorkflow(stickyContext, stickyMatchingRequest) stickyStopWatch.Stop() cancel() if err == nil { scope.IncCounter(metrics.DirectQueryDispatchStickySuccessCount) return &types.HistoryQueryWorkflowResponse{Response: matchingResp}, nil } switch v := err.(type) { case *types.StickyWorkerUnavailableError: case *yarpcerrors.Status: if v.Code() != yarpcerrors.CodeDeadlineExceeded { e.logger.Error("query directly though matching on sticky failed, will not attempt query on non-sticky", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType()), tag.Error(err)) return nil, err } default: e.logger.Error("query directly though matching on sticky failed, will not attempt query on non-sticky", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType()), tag.Error(err)) return nil, err } if msResp.GetIsWorkflowRunning() { e.logger.Info("query direct through matching failed on sticky, clearing sticky before attempting on non-sticky", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType()), tag.Error(err)) resetContext, cancel := context.WithTimeout(context.Background(), 5*time.Second) clearStickinessStopWatch := scope.StartTimer(metrics.DirectQueryDispatchClearStickinessLatency) _, err := e.ResetStickyTaskList(resetContext, &types.HistoryResetStickyTaskListRequest{ DomainUUID: domainID, Execution: queryRequest.GetExecution(), }) clearStickinessStopWatch.Stop() cancel() if err != nil && err != workflow.ErrAlreadyCompleted && err != workflow.ErrNotExists { return nil, err } scope.IncCounter(metrics.DirectQueryDispatchClearStickinessSuccessCount) } } if err := common.IsValidContext(ctx); err != nil { e.logger.Info("query context timed out before query on non-sticky task list could be attempted", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType())) scope.IncCounter(metrics.DirectQueryDispatchTimeoutBeforeNonStickyCount) return nil, err } e.logger.Debug("query directly through matching on sticky timed out, attempting to query on non-sticky", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType()), tag.WorkflowTaskListName(msResp.GetStickyTaskList().GetName()), tag.WorkflowNextEventID(msResp.GetNextEventID())) nonStickyMatchingRequest := &types.MatchingQueryWorkflowRequest{ DomainUUID: domainID, QueryRequest: queryRequest, TaskList: msResp.TaskList, } nonStickyStopWatch := scope.StartTimer(metrics.DirectQueryDispatchNonStickyLatency) matchingResp, err := e.matchingClient.QueryWorkflow(ctx, nonStickyMatchingRequest) nonStickyStopWatch.Stop() if err != nil { e.logger.Error("query directly though matching on non-sticky failed", tag.WorkflowDomainName(queryRequest.GetDomain()), tag.WorkflowID(queryRequest.Execution.GetWorkflowID()), tag.WorkflowRunID(queryRequest.Execution.GetRunID()), tag.WorkflowQueryType(queryRequest.Query.GetQueryType()), tag.Error(err)) return nil, err } scope.IncCounter(metrics.DirectQueryDispatchNonStickySuccessCount) return &types.HistoryQueryWorkflowResponse{Response: matchingResp}, err } func (e *historyEngineImpl) getMutableState( ctx context.Context, domainID string, execution types.WorkflowExecution, ) (retResp *types.GetMutableStateResponse, retError error) { wfContext, release, retError := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, execution) if retError != nil { return } defer func() { release(retError) }() mutableState, retError := wfContext.LoadWorkflowExecution(ctx) if retError != nil { return } currentBranchToken, err := mutableState.GetCurrentBranchToken() if err != nil { return nil, err } executionInfo := mutableState.GetExecutionInfo() execution.RunID = wfContext.GetExecution().RunID workflowState, workflowCloseState := mutableState.GetWorkflowStateCloseStatus() retResp = &types.GetMutableStateResponse{ Execution: &execution, WorkflowType: &types.WorkflowType{Name: executionInfo.WorkflowTypeName}, LastFirstEventID: mutableState.GetLastFirstEventID(), NextEventID: mutableState.GetNextEventID(), PreviousStartedEventID: common.Int64Ptr(mutableState.GetPreviousStartedEventID()), TaskList: &types.TaskList{Name: executionInfo.TaskList}, StickyTaskList: &types.TaskList{Name: executionInfo.StickyTaskList, Kind: types.TaskListKindSticky.Ptr()}, ClientLibraryVersion: executionInfo.ClientLibraryVersion, ClientFeatureVersion: executionInfo.ClientFeatureVersion, ClientImpl: executionInfo.ClientImpl, IsWorkflowRunning: mutableState.IsWorkflowExecutionRunning(), StickyTaskListScheduleToStartTimeout: common.Int32Ptr(executionInfo.StickyScheduleToStartTimeout), CurrentBranchToken: currentBranchToken, WorkflowState: common.Int32Ptr(int32(workflowState)), WorkflowCloseState: common.Int32Ptr(int32(workflowCloseState)), IsStickyTaskListEnabled: mutableState.IsStickyTaskListEnabled(), HistorySize: mutableState.GetHistorySize(), } versionHistories := mutableState.GetVersionHistories() if versionHistories != nil { retResp.VersionHistories = versionHistories.ToInternalType() } return } func (e *historyEngineImpl) DescribeMutableState( ctx context.Context, request *types.DescribeMutableStateRequest, ) (response *types.DescribeMutableStateResponse, retError error) { if err := common.ValidateDomainUUID(request.DomainUUID); err != nil { return nil, err } domainID := request.DomainUUID execution := types.WorkflowExecution{ WorkflowID: request.Execution.WorkflowID, RunID: request.Execution.RunID, } cacheCtx, dbCtx, release, cacheHit, err := e.executionCache.GetAndCreateWorkflowExecution( ctx, domainID, execution, ) if err != nil { return nil, err } defer func() { release(retError) }() response = &types.DescribeMutableStateResponse{} if cacheHit { if msb := cacheCtx.GetWorkflowExecution(); msb != nil { response.MutableStateInCache, err = e.toMutableStateJSON(msb) if err != nil { return nil, err } } } msb, err := dbCtx.LoadWorkflowExecution(ctx) if err != nil { return nil, err } response.MutableStateInDatabase, err = e.toMutableStateJSON(msb) if err != nil { return nil, err } return response, nil } func (e *historyEngineImpl) toMutableStateJSON(msb execution.MutableState) (string, error) { ms := msb.CopyToPersistence() jsonBytes, err := json.Marshal(ms) if err != nil { return "", err } return string(jsonBytes), nil } // ResetStickyTaskList reset the volatile information in mutable state of a given types. // Volatile information are the information related to client, such as: // 1. StickyTaskList // 2. StickyScheduleToStartTimeout // 3. ClientLibraryVersion // 4. ClientFeatureVersion // 5. ClientImpl func (e *historyEngineImpl) ResetStickyTaskList( ctx context.Context, resetRequest *types.HistoryResetStickyTaskListRequest, ) (*types.HistoryResetStickyTaskListResponse, error) { if err := common.ValidateDomainUUID(resetRequest.DomainUUID); err != nil { return nil, err } domainID := resetRequest.DomainUUID err := workflow.UpdateWithAction(ctx, e.executionCache, domainID, *resetRequest.Execution, false, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrAlreadyCompleted } mutableState.ClearStickyness() return nil }, ) if err != nil { return nil, err } return &types.HistoryResetStickyTaskListResponse{}, nil } // DescribeWorkflowExecution returns information about the specified workflow execution. func (e *historyEngineImpl) DescribeWorkflowExecution( ctx context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, ) (retResp *types.DescribeWorkflowExecutionResponse, retError error) { if err := common.ValidateDomainUUID(request.DomainUUID); err != nil { return nil, err } domainID := request.DomainUUID wfExecution := *request.Request.Execution wfContext, release, err0 := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, wfExecution) if err0 != nil { return nil, err0 } defer func() { release(retError) }() mutableState, err1 := wfContext.LoadWorkflowExecution(ctx) if err1 != nil { return nil, err1 } // If history is corrupted, return an error to the end user if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil { return nil, err } else if corrupted { return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."} } executionInfo := mutableState.GetExecutionInfo() result := &types.DescribeWorkflowExecutionResponse{ ExecutionConfiguration: &types.WorkflowExecutionConfiguration{ TaskList: &types.TaskList{Name: executionInfo.TaskList}, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.WorkflowTimeout), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.DecisionStartToCloseTimeout), }, WorkflowExecutionInfo: &types.WorkflowExecutionInfo{ Execution: &types.WorkflowExecution{ WorkflowID: executionInfo.WorkflowID, RunID: executionInfo.RunID, }, Type: &types.WorkflowType{Name: executionInfo.WorkflowTypeName}, StartTime: common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()), HistoryLength: mutableState.GetNextEventID() - common.FirstEventID, AutoResetPoints: executionInfo.AutoResetPoints, Memo: &types.Memo{Fields: executionInfo.Memo}, IsCron: len(executionInfo.CronSchedule) > 0, UpdateTime: common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()), SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes}, PartitionConfig: executionInfo.PartitionConfig, }, } // TODO: we need to consider adding execution time to mutable state // For now execution time will be calculated based on start time and cron schedule/retry policy // each time DescribeWorkflowExecution is called. startEvent, err := mutableState.GetStartEvent(ctx) if err != nil { return nil, err } backoffDuration := time.Duration(startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second result.WorkflowExecutionInfo.ExecutionTime = common.Int64Ptr(result.WorkflowExecutionInfo.GetStartTime() + backoffDuration.Nanoseconds()) if executionInfo.ParentRunID != "" { result.WorkflowExecutionInfo.ParentExecution = &types.WorkflowExecution{ WorkflowID: executionInfo.ParentWorkflowID, RunID: executionInfo.ParentRunID, } result.WorkflowExecutionInfo.ParentDomainID = common.StringPtr(executionInfo.ParentDomainID) result.WorkflowExecutionInfo.ParentInitiatedID = common.Int64Ptr(executionInfo.InitiatedID) parentDomain, err := e.shard.GetDomainCache().GetDomainName(executionInfo.ParentDomainID) if err != nil { return nil, err } result.WorkflowExecutionInfo.ParentDomain = common.StringPtr(parentDomain) } if executionInfo.State == persistence.WorkflowStateCompleted { // for closed workflow result.WorkflowExecutionInfo.CloseStatus = persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus) completionEvent, err := mutableState.GetCompletionEvent(ctx) if err != nil { return nil, err } result.WorkflowExecutionInfo.CloseTime = common.Int64Ptr(completionEvent.GetTimestamp()) } if len(mutableState.GetPendingActivityInfos()) > 0 { for _, ai := range mutableState.GetPendingActivityInfos() { p := &types.PendingActivityInfo{ ActivityID: ai.ActivityID, } state := types.PendingActivityStateScheduled if ai.CancelRequested { state = types.PendingActivityStateCancelRequested } else if ai.StartedID != common.EmptyEventID { state = types.PendingActivityStateStarted } p.State = &state lastHeartbeatUnixNano := ai.LastHeartBeatUpdatedTime.UnixNano() if lastHeartbeatUnixNano > 0 { p.LastHeartbeatTimestamp = common.Int64Ptr(lastHeartbeatUnixNano) p.HeartbeatDetails = ai.Details } // TODO: move to mutable state instead of loading it from event scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduleID) if err != nil { return nil, err } p.ActivityType = scheduledEvent.ActivityTaskScheduledEventAttributes.ActivityType if state == types.PendingActivityStateScheduled { p.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano()) } else { p.LastStartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) } if ai.HasRetryPolicy { p.Attempt = ai.Attempt p.ExpirationTimestamp = common.Int64Ptr(ai.ExpirationTime.UnixNano()) if ai.MaximumAttempts != 0 { p.MaximumAttempts = ai.MaximumAttempts } if ai.LastFailureReason != "" { p.LastFailureReason = common.StringPtr(ai.LastFailureReason) p.LastFailureDetails = ai.LastFailureDetails } if ai.LastWorkerIdentity != "" { p.LastWorkerIdentity = ai.LastWorkerIdentity } if ai.StartedIdentity != "" { p.StartedWorkerIdentity = ai.StartedIdentity } } result.PendingActivities = append(result.PendingActivities, p) } } if len(mutableState.GetPendingChildExecutionInfos()) > 0 { for _, ch := range mutableState.GetPendingChildExecutionInfos() { childDomainName, err := execution.GetChildExecutionDomainName( ch, e.shard.GetDomainCache(), mutableState.GetDomainEntry(), ) if err != nil { if !common.IsEntityNotExistsError(err) { return nil, err } // child domain already deleted, instead of failing the request, // return domainID instead since this field is only for information purpose childDomainName = ch.DomainID } p := &types.PendingChildExecutionInfo{ Domain: childDomainName, WorkflowID: ch.StartedWorkflowID, RunID: ch.StartedRunID, WorkflowTypeName: ch.WorkflowTypeName, InitiatedID: ch.InitiatedID, ParentClosePolicy: &ch.ParentClosePolicy, } result.PendingChildren = append(result.PendingChildren, p) } } if di, ok := mutableState.GetPendingDecision(); ok { pendingDecision := &types.PendingDecisionInfo{ State: types.PendingDecisionStateScheduled.Ptr(), ScheduledTimestamp: common.Int64Ptr(di.ScheduledTimestamp), Attempt: di.Attempt, OriginalScheduledTimestamp: common.Int64Ptr(di.OriginalScheduledTimestamp), } if di.StartedID != common.EmptyEventID { pendingDecision.State = types.PendingDecisionStateStarted.Ptr() pendingDecision.StartedTimestamp = common.Int64Ptr(di.StartedTimestamp) } result.PendingDecision = pendingDecision } return result, nil } func (e *historyEngineImpl) RecordActivityTaskStarted( ctx context.Context, request *types.RecordActivityTaskStartedRequest, ) (*types.RecordActivityTaskStartedResponse, error) { domainEntry, err := e.getActiveDomainByID(request.DomainUUID) if err != nil { return nil, err } domainInfo := domainEntry.GetInfo() domainID := domainInfo.ID domainName := domainInfo.Name workflowExecution := types.WorkflowExecution{ WorkflowID: request.WorkflowExecution.WorkflowID, RunID: request.WorkflowExecution.RunID, } var resurrectError error response := &types.RecordActivityTaskStartedResponse{} err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, false, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrNotExists } scheduleID := request.GetScheduleID() requestID := request.GetRequestID() ai, isRunning := mutableState.GetActivityInfo(scheduleID) // RecordActivityTaskStarted is already past scheduleToClose timeout. // If at this point pending activity is still in mutable state it may be resurrected. // Otherwise it would be completed or timed out already. if isRunning && e.timeSource.Now().After(ai.ScheduledTime.Add(time.Duration(ai.ScheduleToCloseTimeout)*time.Second)) { resurrectedActivities, err := execution.GetResurrectedActivities(ctx, e.shard, mutableState) if err != nil { e.logger.Error("Activity resurrection check failed", tag.Error(err)) return err } if _, ok := resurrectedActivities[scheduleID]; ok { // found activity resurrection domainName := mutableState.GetDomainEntry().GetInfo().Name e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.ActivityResurrectionCounter) e.logger.Error("Encounter resurrected activity, skip", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), ) // remove resurrected activity from mutable state if err := mutableState.DeleteActivity(scheduleID); err != nil { return err } // save resurrection error but return nil here, so that mutable state would get updated in DB resurrectError = workflow.ErrActivityTaskNotFound return nil } } // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskStartedScope, metrics.StaleMutableStateCounter) e.logger.Error("Encounter stale mutable state in RecordActivityTaskStarted", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrStaleState } // Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If // task is not outstanding than it is most probably a duplicate and complete the task. if !isRunning { // Looks like ActivityTask already completed as a result of another call. // It is OK to drop the task at this point. e.logger.Debug("Potentially duplicate task.", tag.TaskID(request.GetTaskID()), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TransferTaskTypeActivityTask)) return workflow.ErrActivityTaskNotFound } scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, scheduleID) if err != nil { return err } response.ScheduledEvent = scheduledEvent response.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano()) response.Attempt = int64(ai.Attempt) response.HeartbeatDetails = ai.Details response.WorkflowType = mutableState.GetWorkflowType() response.WorkflowDomain = domainName if ai.StartedID != common.EmptyEventID { // If activity is started as part of the current request scope then return a positive response if ai.RequestID == requestID { response.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) return nil } // Looks like ActivityTask already started as a result of another call. // It is OK to drop the task at this point. e.logger.Debug("Potentially duplicate task.", tag.TaskID(request.GetTaskID()), tag.WorkflowScheduleID(scheduleID), tag.TaskType(persistence.TransferTaskTypeActivityTask)) return &types.EventAlreadyStartedError{Message: "Activity task already started."} } if _, err := mutableState.AddActivityTaskStartedEvent( ai, scheduleID, requestID, request.PollRequest.GetIdentity(), ); err != nil { return err } response.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) return nil }) if err != nil { return nil, err } if resurrectError != nil { return nil, resurrectError } return response, err } // ScheduleDecisionTask schedules a decision if no outstanding decision found func (e *historyEngineImpl) ScheduleDecisionTask( ctx context.Context, req *types.ScheduleDecisionTaskRequest, ) error { return e.decisionHandler.HandleDecisionTaskScheduled(ctx, req) } // RecordDecisionTaskStarted starts a decision func (e *historyEngineImpl) RecordDecisionTaskStarted( ctx context.Context, request *types.RecordDecisionTaskStartedRequest, ) (*types.RecordDecisionTaskStartedResponse, error) { return e.decisionHandler.HandleDecisionTaskStarted(ctx, request) } // RespondDecisionTaskCompleted completes a decision task func (e *historyEngineImpl) RespondDecisionTaskCompleted( ctx context.Context, req *types.HistoryRespondDecisionTaskCompletedRequest, ) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { return e.decisionHandler.HandleDecisionTaskCompleted(ctx, req) } // RespondDecisionTaskFailed fails a decision func (e *historyEngineImpl) RespondDecisionTaskFailed( ctx context.Context, req *types.HistoryRespondDecisionTaskFailedRequest, ) error { return e.decisionHandler.HandleDecisionTaskFailed(ctx, req) } // RespondActivityTaskCompleted completes an activity task. func (e *historyEngineImpl) RespondActivityTaskCompleted( ctx context.Context, req *types.HistoryRespondActivityTaskCompletedRequest, ) error { domainEntry, err := e.getActiveDomainByID(req.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID domainName := domainEntry.GetInfo().Name request := req.CompleteRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return workflow.ErrDeserializingToken } workflowExecution := types.WorkflowExecution{ WorkflowID: token.WorkflowID, RunID: token.RunID, } var activityStartedTime time.Time var taskList string err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, true, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrAlreadyCompleted } scheduleID := token.ScheduleID if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID scheduleID, err0 = getScheduleID(token.ActivityID, mutableState) if err0 != nil { return err0 } } ai, isRunning := mutableState.GetActivityInfo(scheduleID) // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCompletedScope, metrics.StaleMutableStateCounter) e.logger.Error("Encounter stale mutable state in RecordActivityTaskCompleted", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrStaleState } if !isRunning || ai.StartedID == common.EmptyEventID || (token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) { e.logger.Warn(fmt.Sprintf( "Encounter non existing activity in RecordActivityTaskCompleted: isRunning: %t, ai: %#v, token: %#v.", isRunning, ai, token), tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrActivityTaskNotFound } if _, err := mutableState.AddActivityTaskCompletedEvent(scheduleID, ai.StartedID, request); err != nil { // Unable to add ActivityTaskCompleted event to history return &types.InternalServiceError{Message: "Unable to add ActivityTaskCompleted event to history."} } activityStartedTime = ai.StartedTime taskList = ai.TaskList return nil }) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskCompletedScope). Tagged( metrics.DomainTag(domainName), metrics.WorkflowTypeTag(token.WorkflowType), metrics.ActivityTypeTag(token.ActivityType), metrics.TaskListTag(taskList), ) scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime)) } return err } // RespondActivityTaskFailed completes an activity task failure. func (e *historyEngineImpl) RespondActivityTaskFailed( ctx context.Context, req *types.HistoryRespondActivityTaskFailedRequest, ) error { domainEntry, err := e.getActiveDomainByID(req.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID domainName := domainEntry.GetInfo().Name request := req.FailedRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return workflow.ErrDeserializingToken } workflowExecution := types.WorkflowExecution{ WorkflowID: token.WorkflowID, RunID: token.RunID, } var activityStartedTime time.Time var taskList string err = workflow.UpdateWithActionFunc( ctx, e.executionCache, domainID, workflowExecution, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) { if !mutableState.IsWorkflowExecutionRunning() { return nil, workflow.ErrAlreadyCompleted } scheduleID := token.ScheduleID if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID scheduleID, err0 = getScheduleID(token.ActivityID, mutableState) if err0 != nil { return nil, err0 } } ai, isRunning := mutableState.GetActivityInfo(scheduleID) // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskFailedScope, metrics.StaleMutableStateCounter) e.logger.Error("Encounter stale mutable state in RecordActivityTaskFailed", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return nil, workflow.ErrStaleState } if !isRunning || ai.StartedID == common.EmptyEventID || (token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) { e.logger.Warn(fmt.Sprintf( "Encounter non existing activity in RecordActivityTaskFailed: isRunning: %t, ai: %#v, token: %#v.", isRunning, ai, token), tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return nil, workflow.ErrActivityTaskNotFound } postActions := &workflow.UpdateAction{} ok, err := mutableState.RetryActivity(ai, req.FailedRequest.GetReason(), req.FailedRequest.GetDetails()) if err != nil { return nil, err } if !ok { // no more retry, and we want to record the failure event if _, err := mutableState.AddActivityTaskFailedEvent(scheduleID, ai.StartedID, request); err != nil { // Unable to add ActivityTaskFailed event to history return nil, &types.InternalServiceError{Message: "Unable to add ActivityTaskFailed event to history."} } postActions.CreateDecision = true } activityStartedTime = ai.StartedTime taskList = ai.TaskList return postActions, nil }, ) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryRespondActivityTaskFailedScope). Tagged( metrics.DomainTag(domainName), metrics.WorkflowTypeTag(token.WorkflowType), metrics.ActivityTypeTag(token.ActivityType), metrics.TaskListTag(taskList), ) scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime)) } return err } // RespondActivityTaskCanceled completes an activity task failure. func (e *historyEngineImpl) RespondActivityTaskCanceled( ctx context.Context, req *types.HistoryRespondActivityTaskCanceledRequest, ) error { domainEntry, err := e.getActiveDomainByID(req.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID domainName := domainEntry.GetInfo().Name request := req.CancelRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return workflow.ErrDeserializingToken } workflowExecution := types.WorkflowExecution{ WorkflowID: token.WorkflowID, RunID: token.RunID, } var activityStartedTime time.Time var taskList string err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, true, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrAlreadyCompleted } scheduleID := token.ScheduleID if scheduleID == common.EmptyEventID { // client call CompleteActivityById, so get scheduleID by activityID scheduleID, err0 = getScheduleID(token.ActivityID, mutableState) if err0 != nil { return err0 } } ai, isRunning := mutableState.GetActivityInfo(scheduleID) // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { e.metricsClient.IncCounter(metrics.HistoryRespondActivityTaskCanceledScope, metrics.StaleMutableStateCounter) e.logger.Error("Encounter stale mutable state in RecordActivityTaskCanceled", tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrStaleState } if !isRunning || ai.StartedID == common.EmptyEventID || (token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) { return workflow.ErrActivityTaskNotFound } if _, err := mutableState.AddActivityTaskCanceledEvent( scheduleID, ai.StartedID, ai.CancelRequestID, request.Details, request.Identity); err != nil { // Unable to add ActivityTaskCanceled event to history return &types.InternalServiceError{Message: "Unable to add ActivityTaskCanceled event to history."} } activityStartedTime = ai.StartedTime taskList = ai.TaskList return nil }) if err == nil && !activityStartedTime.IsZero() { scope := e.metricsClient.Scope(metrics.HistoryClientRespondActivityTaskCanceledScope). Tagged( metrics.DomainTag(domainName), metrics.WorkflowTypeTag(token.WorkflowType), metrics.ActivityTypeTag(token.ActivityType), metrics.TaskListTag(taskList), ) scope.RecordTimer(metrics.ActivityE2ELatency, time.Since(activityStartedTime)) } return err } // RecordActivityTaskHeartbeat records an heartbeat for a task. // This method can be used for two purposes. // - For reporting liveness of the activity. // - For reporting progress of the activity, this can be done even if the liveness is not configured. func (e *historyEngineImpl) RecordActivityTaskHeartbeat( ctx context.Context, req *types.HistoryRecordActivityTaskHeartbeatRequest, ) (*types.RecordActivityTaskHeartbeatResponse, error) { domainEntry, err := e.getActiveDomainByID(req.DomainUUID) if err != nil { return nil, err } domainID := domainEntry.GetInfo().ID request := req.HeartbeatRequest token, err0 := e.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, workflow.ErrDeserializingToken } workflowExecution := types.WorkflowExecution{ WorkflowID: token.WorkflowID, RunID: token.RunID, } var cancelRequested bool err = workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, false, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { e.logger.Debug("Heartbeat failed") return workflow.ErrAlreadyCompleted } scheduleID := token.ScheduleID if scheduleID == common.EmptyEventID { // client call RecordActivityHeartbeatByID, so get scheduleID by activityID scheduleID, err0 = getScheduleID(token.ActivityID, mutableState) if err0 != nil { return err0 } } ai, isRunning := mutableState.GetActivityInfo(scheduleID) // First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in // some extreme cassandra failure cases. if !isRunning && scheduleID >= mutableState.GetNextEventID() { e.metricsClient.IncCounter(metrics.HistoryRecordActivityTaskHeartbeatScope, metrics.StaleMutableStateCounter) e.logger.Error("Encounter stale mutable state in RecordActivityTaskHeartbeat", tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrStaleState } if !isRunning || ai.StartedID == common.EmptyEventID || (token.ScheduleID != common.EmptyEventID && token.ScheduleAttempt != int64(ai.Attempt)) { e.logger.Warn(fmt.Sprintf( "Encounter non existing activity in RecordActivityTaskHeartbeat: isRunning: %t, ai: %#v, token: %#v.", isRunning, ai, token), tag.WorkflowDomainName(domainEntry.GetInfo().Name), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowScheduleID(scheduleID), tag.WorkflowNextEventID(mutableState.GetNextEventID()), ) return workflow.ErrActivityTaskNotFound } cancelRequested = ai.CancelRequested e.logger.Debug(fmt.Sprintf("Activity HeartBeat: scheduleEventID: %v, ActivityInfo: %+v, CancelRequested: %v", scheduleID, ai, cancelRequested)) // Save progress and last HB reported time. mutableState.UpdateActivityProgress(ai, request) return nil }) if err != nil { return &types.RecordActivityTaskHeartbeatResponse{}, err } return &types.RecordActivityTaskHeartbeatResponse{CancelRequested: cancelRequested}, nil } // RequestCancelWorkflowExecution records request cancellation event for workflow execution func (e *historyEngineImpl) RequestCancelWorkflowExecution( ctx context.Context, req *types.HistoryRequestCancelWorkflowExecutionRequest, ) error { domainEntry, err := e.getActiveDomainByID(req.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID request := req.CancelRequest parentExecution := req.ExternalWorkflowExecution childWorkflowOnly := req.GetChildWorkflowOnly() workflowExecution := types.WorkflowExecution{ WorkflowID: request.WorkflowExecution.WorkflowID, } // If firstExecutionRunID is set on the request always try to cancel currently running execution if request.GetFirstExecutionRunID() == "" { workflowExecution.RunID = request.WorkflowExecution.RunID } return workflow.UpdateCurrentWithActionFunc(ctx, e.executionCache, e.executionManager, domainID, e.shard.GetDomainCache(), workflowExecution, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) { isCancelRequested, cancelRequestID := mutableState.IsCancelRequested() if !mutableState.IsWorkflowExecutionRunning() { _, closeStatus := mutableState.GetWorkflowStateCloseStatus() if isCancelRequested && closeStatus == persistence.WorkflowCloseStatusCanceled { cancelRequest := req.CancelRequest if cancelRequest.RequestID != "" && cancelRequest.RequestID == cancelRequestID { return &workflow.UpdateAction{Noop: true}, nil } } return nil, workflow.ErrAlreadyCompleted } executionInfo := mutableState.GetExecutionInfo() if request.GetFirstExecutionRunID() != "" { firstRunID := executionInfo.FirstExecutionRunID if firstRunID == "" { // This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier // does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from // workflow execution started event. startEvent, err := mutableState.GetStartEvent(ctx) if err != nil { return nil, err } firstRunID = startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID() } if request.GetFirstExecutionRunID() != firstRunID { return nil, &types.EntityNotExistsError{Message: "Workflow execution not found"} } } if childWorkflowOnly { parentWorkflowID := executionInfo.ParentWorkflowID parentRunID := executionInfo.ParentRunID if parentExecution.GetWorkflowID() != parentWorkflowID || parentExecution.GetRunID() != parentRunID { return nil, workflow.ErrParentMismatch } } if isCancelRequested { cancelRequest := req.CancelRequest if cancelRequest.RequestID != "" && cancelRequest.RequestID == cancelRequestID { return workflow.UpdateWithNewDecision, nil } // if we consider workflow cancellation idempotent, then this error is redundant // this error maybe useful if this API is invoked by external, not decision from transfer queue return nil, workflow.ErrCancellationAlreadyRequested } if _, err := mutableState.AddWorkflowExecutionCancelRequestedEvent(req.CancelRequest.Cause, req); err != nil { return nil, &types.InternalServiceError{Message: "Unable to cancel workflow execution."} } return workflow.UpdateWithNewDecision, nil }) } func (e *historyEngineImpl) SignalWorkflowExecution( ctx context.Context, signalRequest *types.HistorySignalWorkflowExecutionRequest, ) error { domainEntry, err := e.getActiveDomainByID(signalRequest.DomainUUID) if err != nil { return err } if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered { return errDomainDeprecated } domainID := domainEntry.GetInfo().ID request := signalRequest.SignalRequest parentExecution := signalRequest.ExternalWorkflowExecution childWorkflowOnly := signalRequest.GetChildWorkflowOnly() workflowExecution := types.WorkflowExecution{ WorkflowID: request.WorkflowExecution.WorkflowID, RunID: request.WorkflowExecution.RunID, } return workflow.UpdateCurrentWithActionFunc( ctx, e.executionCache, e.executionManager, domainID, e.shard.GetDomainCache(), workflowExecution, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) { // first deduplicate by request id for signal decision // this is done before workflow running check so that already completed error // won't be returned for duplicated signals even if the workflow is closed. if requestID := request.GetRequestID(); requestID != "" { if mutableState.IsSignalRequested(requestID) { return &workflow.UpdateAction{ Noop: true, CreateDecision: false, }, nil } } if !mutableState.IsWorkflowExecutionRunning() { return nil, workflow.ErrAlreadyCompleted } // If history is corrupted, signal will be rejected if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil { return nil, err } else if corrupted { return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."} } executionInfo := mutableState.GetExecutionInfo() createDecisionTask := true // Do not create decision task when the workflow is cron and the cron has not been started yet if mutableState.GetExecutionInfo().CronSchedule != "" && !mutableState.HasProcessedOrPendingDecision() { createDecisionTask = false } maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name) if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals { e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowDomainID(domainID)) return nil, workflow.ErrSignalsLimitExceeded } if childWorkflowOnly { parentWorkflowID := executionInfo.ParentWorkflowID parentRunID := executionInfo.ParentRunID if parentExecution.GetWorkflowID() != parentWorkflowID || parentExecution.GetRunID() != parentRunID { return nil, workflow.ErrParentMismatch } } if requestID := request.GetRequestID(); requestID != "" { mutableState.AddSignalRequested(requestID) } if _, err := mutableState.AddWorkflowExecutionSignaled( request.GetSignalName(), request.GetInput(), request.GetIdentity(), request.GetRequestID(), ); err != nil { return nil, &types.InternalServiceError{Message: "Unable to signal workflow execution."} } return &workflow.UpdateAction{ Noop: false, CreateDecision: createDecisionTask, }, nil }) } func (e *historyEngineImpl) SignalWithStartWorkflowExecution( ctx context.Context, signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest, ) (retResp *types.StartWorkflowExecutionResponse, retError error) { domainEntry, err := e.getActiveDomainByID(signalWithStartRequest.DomainUUID) if err != nil { return nil, err } if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered { return nil, errDomainDeprecated } domainID := domainEntry.GetInfo().ID sRequest := signalWithStartRequest.SignalWithStartRequest workflowExecution := types.WorkflowExecution{ WorkflowID: sRequest.WorkflowID, } var prevMutableState execution.MutableState attempt := 0 wfContext, release, err0 := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution) if err0 == nil { defer func() { release(retError) }() Just_Signal_Loop: for ; attempt < workflow.ConditionalRetryCount; attempt++ { // workflow not exist, will create workflow then signal mutableState, err1 := wfContext.LoadWorkflowExecution(ctx) if err1 != nil { if _, ok := err1.(*types.EntityNotExistsError); ok { break } return nil, err1 } if mutableState.IsSignalRequested(sRequest.GetRequestID()) { return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil } // workflow exist but not running, will restart workflow then signal if !mutableState.IsWorkflowExecutionRunning() { prevMutableState = mutableState break } // workflow exists but history is corrupted, will restart workflow then signal if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil { return nil, err } else if corrupted { prevMutableState = mutableState break } // workflow is running, if policy is TerminateIfRunning, terminate current run then signalWithStart if sRequest.GetWorkflowIDReusePolicy() == types.WorkflowIDReusePolicyTerminateIfRunning { workflowExecution.RunID = uuid.New() runningWFCtx := workflow.NewContext(wfContext, release, mutableState) resp, errTerm := e.terminateAndStartWorkflow( ctx, runningWFCtx, workflowExecution, domainEntry, domainID, nil, signalWithStartRequest, ) // By the time we try to terminate the workflow, it was already terminated // So continue as if we didn't need to terminate it in the first place if _, ok := errTerm.(*types.WorkflowExecutionAlreadyCompletedError); !ok { return resp, errTerm } } executionInfo := mutableState.GetExecutionInfo() maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name) if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals { e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(workflowExecution.GetRunID()), tag.WorkflowDomainID(domainID)) return nil, workflow.ErrSignalsLimitExceeded } requestID := sRequest.GetRequestID() if requestID != "" { mutableState.AddSignalRequested(requestID) } if _, err := mutableState.AddWorkflowExecutionSignaled( sRequest.GetSignalName(), sRequest.GetSignalInput(), sRequest.GetIdentity(), sRequest.GetRequestID(), ); err != nil { return nil, &types.InternalServiceError{Message: "Unable to signal workflow execution."} } // Create a transfer task to schedule a decision task if !mutableState.HasPendingDecision() { _, err := mutableState.AddDecisionTaskScheduledEvent(false) if err != nil { return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."} } } // We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload // the history and try the operation again. if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil { if execution.IsConflictError(err) { continue Just_Signal_Loop } return nil, err } return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil } // end for Just_Signal_Loop if attempt == workflow.ConditionalRetryCount { return nil, workflow.ErrMaxAttemptsExceeded } } else { if _, ok := err0.(*types.EntityNotExistsError); !ok { return nil, err0 } // workflow not exist, will create workflow then signal } // Start workflow and signal startRequest, err := getStartRequest(domainID, sRequest, signalWithStartRequest.PartitionConfig) if err != nil { return nil, err } sigWithStartArg := &signalWithStartArg{ signalWithStartRequest: signalWithStartRequest, prevMutableState: prevMutableState, } return e.startWorkflowHelper( ctx, startRequest, domainEntry, metrics.HistorySignalWithStartWorkflowExecutionScope, sigWithStartArg, ) } func (e *historyEngineImpl) checkForHistoryCorruptions(ctx context.Context, mutableState execution.MutableState) (bool, error) { domainName := mutableState.GetDomainEntry().GetInfo().Name if !e.config.EnableHistoryCorruptionCheck(domainName) { return false, nil } // Ensure that we can obtain start event. Failing to do so means corrupted history or resurrected mutable state record. _, err := mutableState.GetStartEvent(ctx) if err != nil { info := mutableState.GetExecutionInfo() // Mark workflow as corrupted. So that new one can be restarted. info.State = persistence.WorkflowStateCorrupted e.logger.Error("history corruption check failed", tag.WorkflowDomainName(domainName), tag.WorkflowID(info.WorkflowID), tag.WorkflowRunID(info.RunID), tag.WorkflowType(info.WorkflowTypeName), tag.Error(err)) if errors.Is(err, execution.ErrMissingWorkflowStartEvent) { return true, nil } return false, err } return false, nil } // RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate func (e *historyEngineImpl) RemoveSignalMutableState( ctx context.Context, request *types.RemoveSignalMutableStateRequest, ) error { domainEntry, err := e.getActiveDomainByID(request.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID workflowExecution := types.WorkflowExecution{ WorkflowID: request.WorkflowExecution.WorkflowID, RunID: request.WorkflowExecution.RunID, } return workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, false, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrNotExists } mutableState.DeleteSignalRequested(request.GetRequestID()) return nil }) } func (e *historyEngineImpl) TerminateWorkflowExecution( ctx context.Context, terminateRequest *types.HistoryTerminateWorkflowExecutionRequest, ) error { domainEntry, err := e.getActiveDomainByID(terminateRequest.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID request := terminateRequest.TerminateRequest parentExecution := terminateRequest.ExternalWorkflowExecution childWorkflowOnly := terminateRequest.GetChildWorkflowOnly() workflowExecution := types.WorkflowExecution{ WorkflowID: request.WorkflowExecution.WorkflowID, } // If firstExecutionRunID is set on the request always try to cancel currently running execution if request.GetFirstExecutionRunID() == "" { workflowExecution.RunID = request.WorkflowExecution.RunID } return workflow.UpdateCurrentWithActionFunc( ctx, e.executionCache, e.executionManager, domainID, e.shard.GetDomainCache(), workflowExecution, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) { if !mutableState.IsWorkflowExecutionRunning() { return nil, workflow.ErrAlreadyCompleted } executionInfo := mutableState.GetExecutionInfo() if request.GetFirstExecutionRunID() != "" { firstRunID := executionInfo.FirstExecutionRunID if firstRunID == "" { // This is needed for backwards compatibility. Workflow execution create with Cadence release v0.25.0 or earlier // does not have FirstExecutionRunID stored as part of mutable state. If this is not set then load it from // workflow execution started event. startEvent, err := mutableState.GetStartEvent(ctx) if err != nil { return nil, err } firstRunID = startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstExecutionRunID() } if request.GetFirstExecutionRunID() != firstRunID { return nil, &types.EntityNotExistsError{Message: "Workflow execution not found"} } } if childWorkflowOnly { parentWorkflowID := executionInfo.ParentWorkflowID parentRunID := executionInfo.ParentRunID if parentExecution.GetWorkflowID() != parentWorkflowID || parentExecution.GetRunID() != parentRunID { return nil, workflow.ErrParentMismatch } } eventBatchFirstEventID := mutableState.GetNextEventID() return workflow.UpdateWithoutDecision, execution.TerminateWorkflow( mutableState, eventBatchFirstEventID, request.GetReason(), request.GetDetails(), request.GetIdentity(), ) }) } // RecordChildExecutionCompleted records the completion of child execution into parent execution history func (e *historyEngineImpl) RecordChildExecutionCompleted( ctx context.Context, completionRequest *types.RecordChildExecutionCompletedRequest, ) error { domainEntry, err := e.getActiveDomainByID(completionRequest.DomainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID workflowExecution := types.WorkflowExecution{ WorkflowID: completionRequest.WorkflowExecution.WorkflowID, RunID: completionRequest.WorkflowExecution.RunID, } return workflow.UpdateWithAction(ctx, e.executionCache, domainID, workflowExecution, true, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) error { if !mutableState.IsWorkflowExecutionRunning() { return workflow.ErrNotExists } initiatedID := completionRequest.InitiatedID completedExecution := completionRequest.CompletedExecution completionEvent := completionRequest.CompletionEvent // Check mutable state to make sure child execution is in pending child executions ci, isRunning := mutableState.GetChildExecutionInfo(initiatedID) if !isRunning || ci.StartedID == common.EmptyEventID { return &types.EntityNotExistsError{Message: "Pending child execution not found."} } if ci.StartedWorkflowID != completedExecution.GetWorkflowID() { return &types.EntityNotExistsError{Message: "Pending child execution not found."} } switch *completionEvent.EventType { case types.EventTypeWorkflowExecutionCompleted: attributes := completionEvent.WorkflowExecutionCompletedEventAttributes _, err = mutableState.AddChildWorkflowExecutionCompletedEvent(initiatedID, completedExecution, attributes) case types.EventTypeWorkflowExecutionFailed: attributes := completionEvent.WorkflowExecutionFailedEventAttributes _, err = mutableState.AddChildWorkflowExecutionFailedEvent(initiatedID, completedExecution, attributes) case types.EventTypeWorkflowExecutionCanceled: attributes := completionEvent.WorkflowExecutionCanceledEventAttributes _, err = mutableState.AddChildWorkflowExecutionCanceledEvent(initiatedID, completedExecution, attributes) case types.EventTypeWorkflowExecutionTerminated: attributes := completionEvent.WorkflowExecutionTerminatedEventAttributes _, err = mutableState.AddChildWorkflowExecutionTerminatedEvent(initiatedID, completedExecution, attributes) case types.EventTypeWorkflowExecutionTimedOut: attributes := completionEvent.WorkflowExecutionTimedOutEventAttributes _, err = mutableState.AddChildWorkflowExecutionTimedOutEvent(initiatedID, completedExecution, attributes) } return err }) } func (e *historyEngineImpl) ReplicateEventsV2( ctx context.Context, replicateRequest *types.ReplicateEventsV2Request, ) error { return e.nDCReplicator.ApplyEvents(ctx, replicateRequest) } func (e *historyEngineImpl) SyncShardStatus( ctx context.Context, request *types.SyncShardStatusRequest, ) error { clusterName := request.GetSourceCluster() now := time.Unix(0, request.GetTimestamp()) // here there are 3 main things // 1. update the view of remote cluster's shard time // 2. notify the timer gate in the timer queue standby processor // 3. notify the transfer (essentially a no op, just put it here so it looks symmetric) // 4. notify the cross cluster (essentially a no op, just put it here so it looks symmetric) e.shard.SetCurrentTime(clusterName, now) e.txProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}}) e.timerProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}}) e.crossClusterProcessor.NotifyNewTask(clusterName, &hcommon.NotifyTaskInfo{Tasks: []persistence.Task{}}) return nil } func (e *historyEngineImpl) SyncActivity( ctx context.Context, request *types.SyncActivityRequest, ) (retError error) { return e.nDCActivityReplicator.SyncActivity(ctx, request) } func (e *historyEngineImpl) ResetWorkflowExecution( ctx context.Context, resetRequest *types.HistoryResetWorkflowExecutionRequest, ) (response *types.ResetWorkflowExecutionResponse, retError error) { request := resetRequest.ResetRequest domainID := resetRequest.GetDomainUUID() workflowID := request.WorkflowExecution.GetWorkflowID() baseRunID := request.WorkflowExecution.GetRunID() baseContext, baseReleaseFn, err := e.executionCache.GetOrCreateWorkflowExecution( ctx, domainID, types.WorkflowExecution{ WorkflowID: workflowID, RunID: baseRunID, }, ) if err != nil { return nil, err } defer func() { baseReleaseFn(retError) }() baseMutableState, err := baseContext.LoadWorkflowExecution(ctx) if err != nil { return nil, err } if ok := baseMutableState.HasProcessedOrPendingDecision(); !ok { return nil, &types.BadRequestError{ Message: "Cannot reset workflow without a decision task schedule.", } } if request.GetDecisionFinishEventID() <= common.FirstEventID || request.GetDecisionFinishEventID() > baseMutableState.GetNextEventID() { return nil, &types.BadRequestError{ Message: "Decision finish ID must be > 1 && <= workflow next event ID.", } } domainName, err := e.shard.GetDomainCache().GetDomainName(domainID) if err != nil { return nil, err } // also load the current run of the workflow, it can be different from the base runID resp, err := e.executionManager.GetCurrentExecution(ctx, &persistence.GetCurrentExecutionRequest{ DomainID: domainID, WorkflowID: request.WorkflowExecution.GetWorkflowID(), DomainName: domainName, }) if err != nil { return nil, err } currentRunID := resp.RunID var currentContext execution.Context var currentMutableState execution.MutableState var currentReleaseFn execution.ReleaseFunc if currentRunID == baseRunID { currentContext = baseContext currentMutableState = baseMutableState } else { currentContext, currentReleaseFn, err = e.executionCache.GetOrCreateWorkflowExecution( ctx, domainID, types.WorkflowExecution{ WorkflowID: workflowID, RunID: currentRunID, }, ) if err != nil { return nil, err } defer func() { currentReleaseFn(retError) }() currentMutableState, err = currentContext.LoadWorkflowExecution(ctx) if err != nil { return nil, err } } // dedup by requestID if currentMutableState.GetExecutionInfo().CreateRequestID == request.GetRequestID() { e.logger.Info("Duplicated reset request", tag.WorkflowID(workflowID), tag.WorkflowRunID(currentRunID), tag.WorkflowDomainID(domainID)) return &types.ResetWorkflowExecutionResponse{ RunID: currentRunID, }, nil } resetRunID := uuid.New() baseRebuildLastEventID := request.GetDecisionFinishEventID() - 1 baseVersionHistories := baseMutableState.GetVersionHistories() baseCurrentBranchToken, err := baseMutableState.GetCurrentBranchToken() if err != nil { return nil, err } baseRebuildLastEventVersion := baseMutableState.GetCurrentVersion() baseNextEventID := baseMutableState.GetNextEventID() if baseVersionHistories != nil { baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory() if err != nil { return nil, err } baseRebuildLastEventVersion, err = baseCurrentVersionHistory.GetEventVersion(baseRebuildLastEventID) if err != nil { return nil, err } baseCurrentBranchToken = baseCurrentVersionHistory.GetBranchToken() } if err := e.workflowResetter.ResetWorkflow( ctx, domainID, workflowID, baseRunID, baseCurrentBranchToken, baseRebuildLastEventID, baseRebuildLastEventVersion, baseNextEventID, resetRunID, request.GetRequestID(), execution.NewWorkflow( ctx, e.shard.GetClusterMetadata(), currentContext, currentMutableState, currentReleaseFn, ), request.GetReason(), nil, request.GetSkipSignalReapply(), ); err != nil { return nil, err } return &types.ResetWorkflowExecutionResponse{ RunID: resetRunID, }, nil } func (e *historyEngineImpl) NotifyNewHistoryEvent(event *events.Notification) { e.historyEventNotifier.NotifyNewHistoryEvent(event) } func (e *historyEngineImpl) NotifyNewTransferTasks(info *hcommon.NotifyTaskInfo) { if len(info.Tasks) == 0 { return } task := info.Tasks[0] clusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion()) if err == nil { e.txProcessor.NotifyNewTask(clusterName, info) } } func (e *historyEngineImpl) NotifyNewTimerTasks(info *hcommon.NotifyTaskInfo) { if len(info.Tasks) == 0 { return } task := info.Tasks[0] clusterName, err := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion()) if err == nil { e.timerProcessor.NotifyNewTask(clusterName, info) } } func (e *historyEngineImpl) NotifyNewCrossClusterTasks(info *hcommon.NotifyTaskInfo) { taskByTargetCluster := make(map[string][]persistence.Task) for _, task := range info.Tasks { // TODO: consider defining a new interface in persistence package // for cross cluster tasks and add a method for returning the target cluster var targetCluster string switch crossClusterTask := task.(type) { case *persistence.CrossClusterStartChildExecutionTask: targetCluster = crossClusterTask.TargetCluster case *persistence.CrossClusterCancelExecutionTask: targetCluster = crossClusterTask.TargetCluster case *persistence.CrossClusterSignalExecutionTask: targetCluster = crossClusterTask.TargetCluster case *persistence.CrossClusterRecordChildExecutionCompletedTask: targetCluster = crossClusterTask.TargetCluster case *persistence.CrossClusterApplyParentClosePolicyTask: targetCluster = crossClusterTask.TargetCluster default: panic("encountered unknown cross cluster task type") } taskByTargetCluster[targetCluster] = append(taskByTargetCluster[targetCluster], task) } for targetCluster, tasks := range taskByTargetCluster { e.crossClusterProcessor.NotifyNewTask(targetCluster, &hcommon.NotifyTaskInfo{ExecutionInfo: info.ExecutionInfo, Tasks: tasks, PersistenceError: info.PersistenceError}) } } func (e *historyEngineImpl) NotifyNewReplicationTasks(info *hcommon.NotifyTaskInfo) { for _, task := range info.Tasks { hTask, err := hydrateReplicationTask(task, info.ExecutionInfo, info.VersionHistories, info.Activities, info.History) if err != nil { e.logger.Error("failed to preemptively hydrate replication task", tag.Error(err)) continue } e.replicationTaskStore.Put(hTask) } } func hydrateReplicationTask( task persistence.Task, exec *persistence.WorkflowExecutionInfo, versionHistories *persistence.VersionHistories, activities map[int64]*persistence.ActivityInfo, history events.PersistedBlobs, ) (*types.ReplicationTask, error) { info := persistence.ReplicationTaskInfo{ DomainID: exec.DomainID, WorkflowID: exec.WorkflowID, RunID: exec.RunID, TaskType: task.GetType(), CreationTime: task.GetVisibilityTimestamp().UnixNano(), TaskID: task.GetTaskID(), Version: task.GetVersion(), } switch t := task.(type) { case *persistence.HistoryReplicationTask: info.BranchToken = t.BranchToken info.NewRunBranchToken = t.NewRunBranchToken info.FirstEventID = t.FirstEventID info.NextEventID = t.NextEventID case *persistence.SyncActivityTask: info.ScheduledID = t.ScheduledID case *persistence.FailoverMarkerTask: // No specific fields, but supported default: return nil, errors.New("unknown replication task") } hydrator := replication.NewImmediateTaskHydrator( exec.IsRunning(), versionHistories, activities, history.Find(info.BranchToken, info.FirstEventID), history.Find(info.NewRunBranchToken, common.FirstEventID), ) return hydrator.Hydrate(context.Background(), info) } func (e *historyEngineImpl) ResetTransferQueue( ctx context.Context, clusterName string, ) error { _, err := e.txProcessor.HandleAction(ctx, clusterName, queue.NewResetAction()) return err } func (e *historyEngineImpl) ResetTimerQueue( ctx context.Context, clusterName string, ) error { _, err := e.timerProcessor.HandleAction(ctx, clusterName, queue.NewResetAction()) return err } func (e *historyEngineImpl) ResetCrossClusterQueue( ctx context.Context, clusterName string, ) error { _, err := e.crossClusterProcessor.HandleAction(ctx, clusterName, queue.NewResetAction()) return err } func (e *historyEngineImpl) DescribeTransferQueue( ctx context.Context, clusterName string, ) (*types.DescribeQueueResponse, error) { return e.describeQueue(ctx, e.txProcessor, clusterName) } func (e *historyEngineImpl) DescribeTimerQueue( ctx context.Context, clusterName string, ) (*types.DescribeQueueResponse, error) { return e.describeQueue(ctx, e.timerProcessor, clusterName) } func (e *historyEngineImpl) DescribeCrossClusterQueue( ctx context.Context, clusterName string, ) (*types.DescribeQueueResponse, error) { return e.describeQueue(ctx, e.crossClusterProcessor, clusterName) } func (e *historyEngineImpl) describeQueue( ctx context.Context, queueProcessor queue.Processor, clusterName string, ) (*types.DescribeQueueResponse, error) { resp, err := queueProcessor.HandleAction(ctx, clusterName, queue.NewGetStateAction()) if err != nil { return nil, err } serializedStates := make([]string, 0, len(resp.GetStateActionResult.States)) for _, state := range resp.GetStateActionResult.States { serializedStates = append(serializedStates, e.serializeQueueState(state)) } return &types.DescribeQueueResponse{ ProcessingQueueStates: serializedStates, }, nil } func (e *historyEngineImpl) serializeQueueState( state queue.ProcessingQueueState, ) string { return fmt.Sprintf("%v", state) } func (e *historyEngineImpl) validateStartWorkflowExecutionRequest( request *types.StartWorkflowExecutionRequest, metricsScope int, ) error { if len(request.GetRequestID()) == 0 { return &types.BadRequestError{Message: "Missing request ID."} } if request.ExecutionStartToCloseTimeoutSeconds == nil || request.GetExecutionStartToCloseTimeoutSeconds() <= 0 { return &types.BadRequestError{Message: "Missing or invalid ExecutionStartToCloseTimeoutSeconds."} } if request.TaskStartToCloseTimeoutSeconds == nil || request.GetTaskStartToCloseTimeoutSeconds() <= 0 { return &types.BadRequestError{Message: "Missing or invalid TaskStartToCloseTimeoutSeconds."} } if request.TaskList == nil || request.TaskList.GetName() == "" { return &types.BadRequestError{Message: "Missing Tasklist."} } if request.WorkflowType == nil || request.WorkflowType.GetName() == "" { return &types.BadRequestError{Message: "Missing WorkflowType."} } if !common.IsValidIDLength( request.GetDomain(), e.metricsClient.Scope(metricsScope), e.config.MaxIDLengthWarnLimit(), e.config.DomainNameMaxLength(request.GetDomain()), metrics.CadenceErrDomainNameExceededWarnLimit, request.GetDomain(), e.logger, tag.IDTypeDomainName) { return &types.BadRequestError{Message: "Domain exceeds length limit."} } if !common.IsValidIDLength( request.GetWorkflowID(), e.metricsClient.Scope(metricsScope), e.config.MaxIDLengthWarnLimit(), e.config.WorkflowIDMaxLength(request.GetDomain()), metrics.CadenceErrWorkflowIDExceededWarnLimit, request.GetDomain(), e.logger, tag.IDTypeWorkflowID) { return &types.BadRequestError{Message: "WorkflowId exceeds length limit."} } if !common.IsValidIDLength( request.TaskList.GetName(), e.metricsClient.Scope(metricsScope), e.config.MaxIDLengthWarnLimit(), e.config.TaskListNameMaxLength(request.GetDomain()), metrics.CadenceErrTaskListNameExceededWarnLimit, request.GetDomain(), e.logger, tag.IDTypeTaskListName) { return &types.BadRequestError{Message: "TaskList exceeds length limit."} } if !common.IsValidIDLength( request.WorkflowType.GetName(), e.metricsClient.Scope(metricsScope), e.config.MaxIDLengthWarnLimit(), e.config.WorkflowTypeMaxLength(request.GetDomain()), metrics.CadenceErrWorkflowTypeExceededWarnLimit, request.GetDomain(), e.logger, tag.IDTypeWorkflowType) { return &types.BadRequestError{Message: "WorkflowType exceeds length limit."} } return common.ValidateRetryPolicy(request.RetryPolicy) } func (e *historyEngineImpl) overrideStartWorkflowExecutionRequest( domainEntry *cache.DomainCacheEntry, request *types.StartWorkflowExecutionRequest, metricsScope int, ) { domainName := domainEntry.GetInfo().Name maxDecisionStartToCloseTimeoutSeconds := int32(e.config.MaxDecisionStartToCloseSeconds(domainName)) taskStartToCloseTimeoutSecs := request.GetTaskStartToCloseTimeoutSeconds() taskStartToCloseTimeoutSecs = common.MinInt32(taskStartToCloseTimeoutSecs, maxDecisionStartToCloseTimeoutSeconds) taskStartToCloseTimeoutSecs = common.MinInt32(taskStartToCloseTimeoutSecs, request.GetExecutionStartToCloseTimeoutSeconds()) if taskStartToCloseTimeoutSecs != request.GetTaskStartToCloseTimeoutSeconds() { request.TaskStartToCloseTimeoutSeconds = &taskStartToCloseTimeoutSecs e.metricsClient.Scope( metricsScope, metrics.DomainTag(domainName), ).IncCounter(metrics.DecisionStartToCloseTimeoutOverrideCount) } } func getScheduleID( activityID string, mutableState execution.MutableState, ) (int64, error) { if activityID == "" { return 0, &types.BadRequestError{Message: "Neither ActivityID nor ScheduleID is provided"} } activityInfo, ok := mutableState.GetActivityByActivityID(activityID) if !ok { return 0, &types.BadRequestError{Message: "Cannot locate Activity ScheduleID"} } return activityInfo.ScheduleID, nil } func getStartRequest( domainID string, request *types.SignalWithStartWorkflowExecutionRequest, partitionConfig map[string]string, ) (*types.HistoryStartWorkflowExecutionRequest, error) { req := &types.StartWorkflowExecutionRequest{ Domain: request.Domain, WorkflowID: request.WorkflowID, WorkflowType: request.WorkflowType, TaskList: request.TaskList, Input: request.Input, ExecutionStartToCloseTimeoutSeconds: request.ExecutionStartToCloseTimeoutSeconds, TaskStartToCloseTimeoutSeconds: request.TaskStartToCloseTimeoutSeconds, Identity: request.Identity, RequestID: request.RequestID, WorkflowIDReusePolicy: request.WorkflowIDReusePolicy, RetryPolicy: request.RetryPolicy, CronSchedule: request.CronSchedule, Memo: request.Memo, SearchAttributes: request.SearchAttributes, Header: request.Header, DelayStartSeconds: request.DelayStartSeconds, JitterStartSeconds: request.JitterStartSeconds, } return common.CreateHistoryStartWorkflowRequest(domainID, req, time.Now(), partitionConfig) } func (e *historyEngineImpl) applyWorkflowIDReusePolicyForSigWithStart( prevExecutionInfo *persistence.WorkflowExecutionInfo, execution types.WorkflowExecution, wfIDReusePolicy types.WorkflowIDReusePolicy, ) error { prevStartRequestID := prevExecutionInfo.CreateRequestID prevRunID := prevExecutionInfo.RunID prevState := prevExecutionInfo.State prevCloseState := prevExecutionInfo.CloseStatus return e.applyWorkflowIDReusePolicyHelper( prevStartRequestID, prevRunID, prevState, prevCloseState, execution, wfIDReusePolicy, ) } func (e *historyEngineImpl) applyWorkflowIDReusePolicyHelper( prevStartRequestID, prevRunID string, prevState int, prevCloseState int, execution types.WorkflowExecution, wfIDReusePolicy types.WorkflowIDReusePolicy, ) error { // here we know some information about the prev workflow, i.e. either running right now // or has history check if the workflow is finished switch prevState { case persistence.WorkflowStateCreated, persistence.WorkflowStateRunning: msg := "Workflow execution is already running. WorkflowId: %v, RunId: %v." return getWorkflowAlreadyStartedError(msg, prevStartRequestID, execution.GetWorkflowID(), prevRunID) case persistence.WorkflowStateCompleted: // previous workflow completed, proceed case persistence.WorkflowStateCorrupted: // ignore workflow ID reuse policy for corrupted workflows, treat as they do not exist return nil default: // persistence.WorkflowStateZombie or unknown type return &types.InternalServiceError{Message: fmt.Sprintf("Failed to process workflow, workflow has invalid state: %v.", prevState)} } switch wfIDReusePolicy { case types.WorkflowIDReusePolicyAllowDuplicateFailedOnly: if _, ok := FailedWorkflowCloseState[prevCloseState]; !ok { msg := "Workflow execution already finished successfully. WorkflowId: %v, RunId: %v. Workflow ID reuse policy: allow duplicate workflow ID if last run failed." return getWorkflowAlreadyStartedError(msg, prevStartRequestID, execution.GetWorkflowID(), prevRunID) } case types.WorkflowIDReusePolicyAllowDuplicate, types.WorkflowIDReusePolicyTerminateIfRunning: // no check need here case types.WorkflowIDReusePolicyRejectDuplicate: msg := "Workflow execution already finished. WorkflowId: %v, RunId: %v. Workflow ID reuse policy: reject duplicate workflow ID." return getWorkflowAlreadyStartedError(msg, prevStartRequestID, execution.GetWorkflowID(), prevRunID) default: return &types.InternalServiceError{Message: "Failed to process start workflow reuse policy."} } return nil } func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workflowID string, runID string) error { return &types.WorkflowExecutionAlreadyStartedError{ Message: fmt.Sprintf(errMsg, workflowID, runID), StartRequestID: createRequestID, RunID: runID, } } func (e *historyEngineImpl) GetReplicationMessages( ctx context.Context, pollingCluster string, lastReadMessageID int64, ) (*types.ReplicationMessages, error) { scope := metrics.HistoryGetReplicationMessagesScope sw := e.metricsClient.StartTimer(scope, metrics.GetReplicationMessagesForShardLatency) defer sw.Stop() replicationMessages, err := e.replicationAckManager.GetTasks( ctx, pollingCluster, lastReadMessageID, ) if err != nil { e.logger.Error("Failed to retrieve replication messages.", tag.Error(err)) return nil, err } // Set cluster status for sync shard info replicationMessages.SyncShardStatus = &types.SyncShardStatus{ Timestamp: common.Int64Ptr(e.timeSource.Now().UnixNano()), } e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks))) return replicationMessages, nil } func (e *historyEngineImpl) GetDLQReplicationMessages( ctx context.Context, taskInfos []*types.ReplicationTaskInfo, ) ([]*types.ReplicationTask, error) { scope := metrics.HistoryGetDLQReplicationMessagesScope sw := e.metricsClient.StartTimer(scope, metrics.GetDLQReplicationMessagesLatency) defer sw.Stop() tasks := make([]*types.ReplicationTask, 0, len(taskInfos)) for _, taskInfo := range taskInfos { task, err := e.replicationHydrator.Hydrate(ctx, persistence.ReplicationTaskInfo{ DomainID: taskInfo.DomainID, WorkflowID: taskInfo.WorkflowID, RunID: taskInfo.RunID, TaskID: taskInfo.TaskID, TaskType: int(taskInfo.TaskType), FirstEventID: taskInfo.FirstEventID, NextEventID: taskInfo.NextEventID, Version: taskInfo.Version, ScheduledID: taskInfo.ScheduledID, }) if err != nil { e.logger.Error("Failed to fetch DLQ replication messages.", tag.Error(err)) return nil, err } if task != nil { tasks = append(tasks, task) } } return tasks, nil } func (e *historyEngineImpl) ReapplyEvents( ctx context.Context, domainUUID string, workflowID string, runID string, reapplyEvents []*types.HistoryEvent, ) error { domainEntry, err := e.getActiveDomainByID(domainUUID) if err != nil { switch { case domainEntry != nil && domainEntry.IsDomainPendingActive(): return nil default: return err } } domainID := domainEntry.GetInfo().ID // remove run id from the execution so that reapply events to the current run currentExecution := types.WorkflowExecution{ WorkflowID: workflowID, } return workflow.UpdateWithActionFunc( ctx, e.executionCache, domainID, currentExecution, e.timeSource.Now(), func(wfContext execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) { // Filter out reapply event from the same cluster toReapplyEvents := make([]*types.HistoryEvent, 0, len(reapplyEvents)) lastWriteVersion, err := mutableState.GetLastWriteVersion() if err != nil { return nil, err } for _, event := range reapplyEvents { if event.Version == lastWriteVersion { // The reapply is from the same cluster. Ignoring. continue } dedupResource := definition.NewEventReappliedID(runID, event.ID, event.Version) if mutableState.IsResourceDuplicated(dedupResource) { // already apply the signal continue } toReapplyEvents = append(toReapplyEvents, event) } if len(toReapplyEvents) == 0 { return &workflow.UpdateAction{ Noop: true, }, nil } if !mutableState.IsWorkflowExecutionRunning() { // need to reset target workflow (which is also the current workflow) // to accept events to be reapplied baseRunID := mutableState.GetExecutionInfo().RunID resetRunID := uuid.New() baseRebuildLastEventID := mutableState.GetPreviousStartedEventID() // TODO when https://github.com/uber/cadence/issues/2420 is finished, remove this block, // since cannot reapply event to a finished workflow which had no decisions started if baseRebuildLastEventID == common.EmptyEventID { e.logger.Warn("cannot reapply event to a finished workflow", tag.WorkflowDomainID(domainID), tag.WorkflowID(currentExecution.GetWorkflowID()), ) e.metricsClient.IncCounter(metrics.HistoryReapplyEventsScope, metrics.EventReapplySkippedCount) return &workflow.UpdateAction{Noop: true}, nil } baseVersionHistories := mutableState.GetVersionHistories() if baseVersionHistories == nil { return nil, execution.ErrMissingVersionHistories } baseCurrentVersionHistory, err := baseVersionHistories.GetCurrentVersionHistory() if err != nil { return nil, err } baseRebuildLastEventVersion, err := baseCurrentVersionHistory.GetEventVersion(baseRebuildLastEventID) if err != nil { return nil, err } baseCurrentBranchToken := baseCurrentVersionHistory.GetBranchToken() baseNextEventID := mutableState.GetNextEventID() if err = e.workflowResetter.ResetWorkflow( ctx, domainID, workflowID, baseRunID, baseCurrentBranchToken, baseRebuildLastEventID, baseRebuildLastEventVersion, baseNextEventID, resetRunID, uuid.New(), execution.NewWorkflow( ctx, e.shard.GetClusterMetadata(), wfContext, mutableState, execution.NoopReleaseFn, ), ndc.EventsReapplicationResetWorkflowReason, toReapplyEvents, false, ); err != nil { return nil, err } return &workflow.UpdateAction{ Noop: true, }, nil } postActions := &workflow.UpdateAction{ CreateDecision: true, } // Do not create decision task when the workflow is cron and the cron has not been started yet if mutableState.GetExecutionInfo().CronSchedule != "" && !mutableState.HasProcessedOrPendingDecision() { postActions.CreateDecision = false } reappliedEvents, err := e.eventsReapplier.ReapplyEvents( ctx, mutableState, toReapplyEvents, runID, ) if err != nil { e.logger.Error("failed to re-apply stale events", tag.Error(err)) return nil, &types.InternalServiceError{Message: "unable to re-apply stale events"} } if len(reappliedEvents) == 0 { return &workflow.UpdateAction{ Noop: true, }, nil } return postActions, nil }, ) } func (e *historyEngineImpl) CountDLQMessages(ctx context.Context, forceFetch bool) (map[string]int64, error) { return e.replicationDLQHandler.GetMessageCount(ctx, forceFetch) } func (e *historyEngineImpl) ReadDLQMessages( ctx context.Context, request *types.ReadDLQMessagesRequest, ) (*types.ReadDLQMessagesResponse, error) { tasks, taskInfo, token, err := e.replicationDLQHandler.ReadMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), int(request.GetMaximumPageSize()), request.GetNextPageToken(), ) if err != nil { return nil, err } return &types.ReadDLQMessagesResponse{ Type: request.GetType().Ptr(), ReplicationTasks: tasks, ReplicationTasksInfo: taskInfo, NextPageToken: token, }, nil } func (e *historyEngineImpl) PurgeDLQMessages( ctx context.Context, request *types.PurgeDLQMessagesRequest, ) error { return e.replicationDLQHandler.PurgeMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), ) } func (e *historyEngineImpl) MergeDLQMessages( ctx context.Context, request *types.MergeDLQMessagesRequest, ) (*types.MergeDLQMessagesResponse, error) { token, err := e.replicationDLQHandler.MergeMessages( ctx, request.GetSourceCluster(), request.GetInclusiveEndMessageID(), int(request.GetMaximumPageSize()), request.GetNextPageToken(), ) if err != nil { return nil, err } return &types.MergeDLQMessagesResponse{ NextPageToken: token, }, nil } func (e *historyEngineImpl) RefreshWorkflowTasks( ctx context.Context, domainUUID string, workflowExecution types.WorkflowExecution, ) (retError error) { domainEntry, err := e.shard.GetDomainCache().GetDomainByID(domainUUID) if err != nil { return err } domainID := domainEntry.GetInfo().ID wfContext, release, err := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution) if err != nil { return err } defer func() { release(retError) }() mutableState, err := wfContext.LoadWorkflowExecution(ctx) if err != nil { return err } mutableStateTaskRefresher := execution.NewMutableStateTaskRefresher( e.shard.GetConfig(), e.shard.GetClusterMetadata(), e.shard.GetDomainCache(), e.shard.GetEventsCache(), e.shard.GetShardID(), ) err = mutableStateTaskRefresher.RefreshTasks(ctx, mutableState.GetExecutionInfo().StartTimestamp, mutableState) if err != nil { return err } err = wfContext.UpdateWorkflowExecutionTasks(ctx, e.shard.GetTimeSource().Now()) if err != nil { return err } return nil } func (e *historyEngineImpl) GetCrossClusterTasks( ctx context.Context, targetCluster string, ) ([]*types.CrossClusterTaskRequest, error) { actionResult, err := e.crossClusterProcessor.HandleAction(ctx, targetCluster, queue.NewGetTasksAction()) if err != nil { return nil, err } return actionResult.GetTasksResult.TaskRequests, nil } func (e *historyEngineImpl) RespondCrossClusterTasksCompleted( ctx context.Context, targetCluster string, responses []*types.CrossClusterTaskResponse, ) error { _, err := e.crossClusterProcessor.HandleAction(ctx, targetCluster, queue.NewUpdateTasksAction(responses)) return err } func (e *historyEngineImpl) newChildContext( parentCtx context.Context, ) (context.Context, context.CancelFunc) { ctxTimeout := contextLockTimeout if deadline, ok := parentCtx.Deadline(); ok { now := e.shard.GetTimeSource().Now() parentTimeout := deadline.Sub(now) if parentTimeout > 0 && parentTimeout < contextLockTimeout { ctxTimeout = parentTimeout } } return context.WithTimeout(context.Background(), ctxTimeout) } func (e *historyEngineImpl) getActiveDomainByID(id string) (*cache.DomainCacheEntry, error) { return cache.GetActiveDomainByID(e.shard.GetDomainCache(), e.clusterMetadata.GetCurrentClusterName(), id) }