service/history/handler/handler.go (1,704 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package handler import ( "context" "errors" "fmt" "strconv" "sync" "sync/atomic" "time" "github.com/pborman/uuid" "go.uber.org/yarpc/yarpcerrors" "golang.org/x/sync/errgroup" "github.com/uber/cadence/common" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/common/types/mapper/proto" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/constants" "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/engine/engineimpl" "github.com/uber/cadence/service/history/events" "github.com/uber/cadence/service/history/failover" "github.com/uber/cadence/service/history/replication" "github.com/uber/cadence/service/history/resource" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" "github.com/uber/cadence/service/history/workflowcache" ) const ( shardOwnershipTransferDelay = 5 * time.Second ) type ( // handlerImpl is an implementation for history service independent of wire protocol handlerImpl struct { resource.Resource shuttingDown int32 controller shard.Controller tokenSerializer common.TaskTokenSerializer startWG sync.WaitGroup config *config.Config historyEventNotifier events.Notifier rateLimiter quotas.Limiter crossClusterTaskFetchers task.Fetchers replicationTaskFetchers replication.TaskFetchers queueTaskProcessor task.Processor failoverCoordinator failover.Coordinator workflowIDCache workflowcache.WFCache ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter } ) var _ Handler = (*handlerImpl)(nil) var _ shard.EngineFactory = (*handlerImpl)(nil) // NewHandler creates a thrift handler for the history service func NewHandler( resource resource.Resource, config *config.Config, wfCache workflowcache.WFCache, ratelimitInternalPerWorkflowID dynamicconfig.BoolPropertyFnWithDomainFilter, ) Handler { handler := &handlerImpl{ Resource: resource, config: config, tokenSerializer: common.NewJSONTaskTokenSerializer(), rateLimiter: quotas.NewDynamicRateLimiter(config.RPS.AsFloat64()), workflowIDCache: wfCache, ratelimitInternalPerWorkflowID: ratelimitInternalPerWorkflowID, } // prevent us from trying to serve requests before shard controller is started and ready handler.startWG.Add(1) return handler } // Start starts the handler func (h *handlerImpl) Start() { h.crossClusterTaskFetchers = task.NewCrossClusterTaskFetchers( h.GetClusterMetadata(), h.GetClientBean(), &task.FetcherOptions{ Parallelism: h.config.CrossClusterFetcherParallelism, AggregationInterval: h.config.CrossClusterFetcherAggregationInterval, ServiceBusyBackoffInterval: h.config.CrossClusterFetcherServiceBusyBackoffInterval, ErrorRetryInterval: h.config.CrossClusterFetcherErrorBackoffInterval, TimerJitterCoefficient: h.config.CrossClusterFetcherJitterCoefficient, }, h.GetMetricsClient(), h.GetLogger(), ) h.crossClusterTaskFetchers.Start() h.replicationTaskFetchers = replication.NewTaskFetchers( h.GetLogger(), h.config, h.GetClusterMetadata(), h.GetClientBean(), ) h.replicationTaskFetchers.Start() var err error taskPriorityAssigner := task.NewPriorityAssigner( h.GetClusterMetadata().GetCurrentClusterName(), h.GetDomainCache(), h.GetLogger(), h.GetMetricsClient(), h.config, ) h.queueTaskProcessor, err = task.NewProcessor( taskPriorityAssigner, h.config, h.GetLogger(), h.GetMetricsClient(), ) if err != nil { h.GetLogger().Fatal("Creating priority task processor failed", tag.Error(err)) } h.queueTaskProcessor.Start() h.controller = shard.NewShardController( h.Resource, h, h.config, ) h.historyEventNotifier = events.NewNotifier(h.GetTimeSource(), h.GetMetricsClient(), h.config.GetShardID) // events notifier must starts before controller h.historyEventNotifier.Start() h.failoverCoordinator = failover.NewCoordinator( h.GetDomainManager(), h.GetHistoryClient(), h.GetTimeSource(), h.GetDomainCache(), h.config, h.GetMetricsClient(), h.GetLogger(), ) if h.config.EnableGracefulFailover() { h.failoverCoordinator.Start() } h.controller.Start() h.startWG.Done() } // Stop stops the handler func (h *handlerImpl) Stop() { h.prepareToShutDown() h.crossClusterTaskFetchers.Stop() h.replicationTaskFetchers.Stop() h.queueTaskProcessor.Stop() h.controller.Stop() h.historyEventNotifier.Stop() h.failoverCoordinator.Stop() } // PrepareToStop starts graceful traffic drain in preparation for shutdown func (h *handlerImpl) PrepareToStop(remainingTime time.Duration) time.Duration { h.GetLogger().Info("ShutdownHandler: Initiating shardController shutdown") h.controller.PrepareToStop() h.GetLogger().Info("ShutdownHandler: Waiting for traffic to drain") remainingTime = common.SleepWithMinDuration(shardOwnershipTransferDelay, remainingTime) h.GetLogger().Info("ShutdownHandler: No longer taking rpc requests") h.prepareToShutDown() return remainingTime } func (h *handlerImpl) prepareToShutDown() { atomic.StoreInt32(&h.shuttingDown, 1) } func (h *handlerImpl) isShuttingDown() bool { return atomic.LoadInt32(&h.shuttingDown) != 0 } // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard func (h *handlerImpl) CreateEngine( shardContext shard.Context, ) engine.Engine { return engineimpl.NewEngineWithShardContext( shardContext, h.GetVisibilityManager(), h.GetMatchingClient(), h.GetSDKClient(), h.historyEventNotifier, h.config, h.crossClusterTaskFetchers, h.replicationTaskFetchers, h.GetMatchingRawClient(), h.queueTaskProcessor, h.failoverCoordinator, h.workflowIDCache, h.ratelimitInternalPerWorkflowID, ) } // Health is for health check func (h *handlerImpl) Health(ctx context.Context) (*types.HealthStatus, error) { h.startWG.Wait() h.GetLogger().Debug("History health check endpoint reached.") hs := &types.HealthStatus{Ok: true, Msg: "OK"} return hs, nil } // RecordActivityTaskHeartbeat - Record Activity Task Heart beat. func (h *handlerImpl) RecordActivityTaskHeartbeat( ctx context.Context, wrappedRequest *types.HistoryRecordActivityTaskHeartbeatRequest, ) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskHeartbeatScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } heartbeatRequest := wrappedRequest.HeartbeatRequest token, err0 := h.tokenSerializer.Deserialize(heartbeatRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return nil, h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { return nil, h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.RecordActivityTaskHeartbeat(ctx, wrappedRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, "") } return response, nil } // RecordActivityTaskStarted - Record Activity Task started. func (h *handlerImpl) RecordActivityTaskStarted( ctx context.Context, recordRequest *types.RecordActivityTaskStartedRequest, ) (resp *types.RecordActivityTaskStartedResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordActivityTaskStartedScope) defer sw.Stop() domainID := recordRequest.GetDomainUUID() workflowExecution := recordRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() h.emitInfoOrDebugLog( domainID, "RecordActivityTaskStarted", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID), tag.WorkflowScheduleID(recordRequest.GetScheduleID()), ) if recordRequest.GetDomainUUID() == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, workflowID, "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, workflowID, "") } engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.RecordActivityTaskStarted(ctx, recordRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, "") } return response, nil } // RecordDecisionTaskStarted - Record Decision Task started. func (h *handlerImpl) RecordDecisionTaskStarted( ctx context.Context, recordRequest *types.RecordDecisionTaskStartedRequest, ) (resp *types.RecordDecisionTaskStartedResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordDecisionTaskStartedScope) defer sw.Stop() domainID := recordRequest.GetDomainUUID() workflowExecution := recordRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() h.emitInfoOrDebugLog( domainID, "RecordDecisionTaskStarted", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowExecution.GetWorkflowID()), tag.WorkflowRunID(recordRequest.WorkflowExecution.RunID), tag.WorkflowScheduleID(recordRequest.GetScheduleID()), ) if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, workflowID, runID) } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, workflowID, runID) } if recordRequest.PollRequest == nil || recordRequest.PollRequest.TaskList.GetName() == "" { return nil, h.error(constants.ErrTaskListNotSet, scope, domainID, workflowID, runID) } engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { h.GetLogger().Error("RecordDecisionTaskStarted failed.", tag.Error(err1), tag.WorkflowID(recordRequest.WorkflowExecution.GetWorkflowID()), tag.WorkflowRunID(runID), tag.WorkflowRunID(recordRequest.WorkflowExecution.GetRunID()), tag.WorkflowScheduleID(recordRequest.GetScheduleID()), ) return nil, h.error(err1, scope, domainID, workflowID, runID) } response, err2 := engine.RecordDecisionTaskStarted(ctx, recordRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil } // RespondActivityTaskCompleted - records completion of an activity task func (h *handlerImpl) RespondActivityTaskCompleted( ctx context.Context, wrappedRequest *types.HistoryRespondActivityTaskCompletedRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCompletedScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskCompleted(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // RespondActivityTaskFailed - records failure of an activity task func (h *handlerImpl) RespondActivityTaskFailed( ctx context.Context, wrappedRequest *types.HistoryRespondActivityTaskFailedRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskFailedScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } failRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskFailed(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // RespondActivityTaskCanceled - records failure of an activity task func (h *handlerImpl) RespondActivityTaskCanceled( ctx context.Context, wrappedRequest *types.HistoryRespondActivityTaskCanceledRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondActivityTaskCanceledScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := wrappedRequest.CancelRequest token, err0 := h.tokenSerializer.Deserialize(cancelRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return h.error(err0, scope, domainID, "", "") } err0 = validateTaskToken(token) if err0 != nil { return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondActivityTaskCanceled(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // RespondDecisionTaskCompleted - records completion of a decision task func (h *handlerImpl) RespondDecisionTaskCompleted( ctx context.Context, wrappedRequest *types.HistoryRespondDecisionTaskCompletedRequest, ) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskCompletedScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } completeRequest := wrappedRequest.CompleteRequest if len(completeRequest.Decisions) == 0 { scope.IncCounter(metrics.EmptyCompletionDecisionsCounter) } token, err0 := h.tokenSerializer.Deserialize(completeRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return nil, h.error(err0, scope, domainID, "", "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskCompleted. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", token.DomainID, token.WorkflowID, token.RunID, token.ScheduleID)) err0 = validateTaskToken(token) if err0 != nil { return nil, h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } response, err2 := engine.RespondDecisionTaskCompleted(ctx, wrappedRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil } // RespondDecisionTaskFailed - failed response to decision task func (h *handlerImpl) RespondDecisionTaskFailed( ctx context.Context, wrappedRequest *types.HistoryRespondDecisionTaskFailedRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondDecisionTaskFailedScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } failedRequest := wrappedRequest.FailedRequest token, err0 := h.tokenSerializer.Deserialize(failedRequest.TaskToken) if err0 != nil { err0 = &types.BadRequestError{Message: fmt.Sprintf("Error deserializing task token. Error: %v", err0)} return h.error(err0, scope, domainID, "", "") } h.GetLogger().Debug(fmt.Sprintf("RespondDecisionTaskFailed. DomainID: %v, WorkflowID: %v, RunID: %v, ScheduleID: %v", token.DomainID, token.WorkflowID, token.RunID, token.ScheduleID)) if failedRequest != nil && failedRequest.GetCause() == types.DecisionTaskFailedCauseUnhandledDecision { h.GetLogger().Info("Non-Deterministic Error", tag.WorkflowDomainID(token.DomainID), tag.WorkflowID(token.WorkflowID), tag.WorkflowRunID(token.RunID)) domainName, err := h.GetDomainCache().GetDomainName(token.DomainID) var domainTag metrics.Tag if err == nil { domainTag = metrics.DomainTag(domainName) } else { domainTag = metrics.DomainUnknownTag() } scope.Tagged(domainTag).IncCounter(metrics.CadenceErrNonDeterministicCounter) } err0 = validateTaskToken(token) if err0 != nil { return h.error(err0, scope, domainID, "", "") } workflowID := token.WorkflowID runID := token.RunID engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RespondDecisionTaskFailed(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // StartWorkflowExecution - creates a new workflow execution func (h *handlerImpl) StartWorkflowExecution( ctx context.Context, wrappedRequest *types.HistoryStartWorkflowExecutionRequest, ) (resp *types.StartWorkflowExecutionResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryStartWorkflowExecutionScope) defer sw.Stop() domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } startRequest := wrappedRequest.StartRequest workflowID := startRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, "") } response, err2 := engine.StartWorkflowExecution(ctx, wrappedRequest) runID := response.GetRunID() if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return response, nil } // DescribeHistoryHost returns information about the internal states of a history host func (h *handlerImpl) DescribeHistoryHost( ctx context.Context, request *types.DescribeHistoryHostRequest, ) (resp *types.DescribeHistoryHostResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() numOfItemsInCacheByID, numOfItemsInCacheByName := h.GetDomainCache().GetCacheSize() status := "" switch h.controller.Status() { case common.DaemonStatusInitialized: status = "initialized" case common.DaemonStatusStarted: status = "started" case common.DaemonStatusStopped: status = "stopped" } resp = &types.DescribeHistoryHostResponse{ NumberOfShards: int32(h.controller.NumShards()), ShardIDs: h.controller.ShardIDs(), DomainCache: &types.DomainCacheInfo{ NumOfItemsInCacheByID: numOfItemsInCacheByID, NumOfItemsInCacheByName: numOfItemsInCacheByName, }, ShardControllerStatus: status, Address: h.GetHostInfo().GetAddress(), } return resp, nil } // RemoveTask returns information about the internal states of a history host func (h *handlerImpl) RemoveTask( ctx context.Context, request *types.RemoveTaskRequest, ) (retError error) { executionMgr, err := h.GetExecutionManager(int(request.GetShardID())) if err != nil { return err } switch taskType := common.TaskType(request.GetType()); taskType { case common.TaskTypeTransfer: return executionMgr.CompleteTransferTask(ctx, &persistence.CompleteTransferTaskRequest{ TaskID: request.GetTaskID(), }) case common.TaskTypeTimer: return executionMgr.CompleteTimerTask(ctx, &persistence.CompleteTimerTaskRequest{ VisibilityTimestamp: time.Unix(0, request.GetVisibilityTimestamp()), TaskID: request.GetTaskID(), }) case common.TaskTypeReplication: return executionMgr.CompleteReplicationTask(ctx, &persistence.CompleteReplicationTaskRequest{ TaskID: request.GetTaskID(), }) case common.TaskTypeCrossCluster: return executionMgr.CompleteCrossClusterTask(ctx, &persistence.CompleteCrossClusterTaskRequest{ TargetCluster: request.GetClusterName(), TaskID: request.GetTaskID(), }) default: return constants.ErrInvalidTaskType } } // CloseShard closes a shard hosted by this instance func (h *handlerImpl) CloseShard( ctx context.Context, request *types.CloseShardRequest, ) (retError error) { h.controller.RemoveEngineForShard(int(request.GetShardID())) return nil } // ResetQueue resets processing queue states func (h *handlerImpl) ResetQueue( ctx context.Context, request *types.ResetQueueRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetQueueScope) defer sw.Stop() engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return h.error(err, scope, "", "", "") } switch taskType := common.TaskType(request.GetType()); taskType { case common.TaskTypeTransfer: err = engine.ResetTransferQueue(ctx, request.GetClusterName()) case common.TaskTypeTimer: err = engine.ResetTimerQueue(ctx, request.GetClusterName()) case common.TaskTypeCrossCluster: err = engine.ResetCrossClusterQueue(ctx, request.GetClusterName()) default: err = constants.ErrInvalidTaskType } if err != nil { return h.error(err, scope, "", "", "") } return nil } // DescribeQueue describes processing queue states func (h *handlerImpl) DescribeQueue( ctx context.Context, request *types.DescribeQueueRequest, ) (resp *types.DescribeQueueResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeQueueScope) defer sw.Stop() engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return nil, h.error(err, scope, "", "", "") } switch taskType := common.TaskType(request.GetType()); taskType { case common.TaskTypeTransfer: resp, err = engine.DescribeTransferQueue(ctx, request.GetClusterName()) case common.TaskTypeTimer: resp, err = engine.DescribeTimerQueue(ctx, request.GetClusterName()) case common.TaskTypeCrossCluster: resp, err = engine.DescribeCrossClusterQueue(ctx, request.GetClusterName()) default: err = constants.ErrInvalidTaskType } if err != nil { return nil, h.error(err, scope, "", "", "") } return resp, nil } // DescribeMutableState - returns the internal analysis of workflow execution state func (h *handlerImpl) DescribeMutableState( ctx context.Context, request *types.DescribeMutableStateRequest, ) (resp *types.DescribeMutableStateResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeMutabelStateScope) defer sw.Stop() domainID := request.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } workflowExecution := request.Execution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.DescribeMutableState(ctx, request) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // GetMutableState - returns the id of the next event in the execution's history func (h *handlerImpl) GetMutableState( ctx context.Context, getRequest *types.GetMutableStateRequest, ) (resp *types.GetMutableStateResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetMutableStateScope) defer sw.Stop() domainID := getRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.GetMutableState(ctx, getRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // PollMutableState - returns the id of the next event in the execution's history func (h *handlerImpl) PollMutableState( ctx context.Context, getRequest *types.PollMutableStateRequest, ) (resp *types.PollMutableStateResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryPollMutableStateScope) defer sw.Stop() domainID := getRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := getRequest.Execution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.PollMutableState(ctx, getRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // DescribeWorkflowExecution returns information about the specified workflow execution. func (h *handlerImpl) DescribeWorkflowExecution( ctx context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, ) (resp *types.DescribeWorkflowExecutionResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryDescribeWorkflowExecutionScope) defer sw.Stop() domainID := request.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := request.Request.Execution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.DescribeWorkflowExecution(ctx, request) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // RequestCancelWorkflowExecution - requests cancellation of a workflow func (h *handlerImpl) RequestCancelWorkflowExecution( ctx context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRequestCancelWorkflowExecutionScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" || request.CancelRequest.GetDomain() == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } cancelRequest := request.CancelRequest h.GetLogger().Debug(fmt.Sprintf("RequestCancelWorkflowExecution. DomainID: %v/%v, WorkflowID: %v, RunID: %v.", cancelRequest.GetDomain(), request.GetDomainUUID(), cancelRequest.WorkflowExecution.GetWorkflowID(), cancelRequest.WorkflowExecution.GetRunID())) workflowID := cancelRequest.WorkflowExecution.GetWorkflowID() runID := cancelRequest.WorkflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RequestCancelWorkflowExecution(ctx, request) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in // WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution. func (h *handlerImpl) SignalWorkflowExecution( ctx context.Context, wrappedRequest *types.HistorySignalWorkflowExecutionRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWorkflowExecutionScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.SignalRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.SignalWorkflowExecution(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution. // If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history // and a decision task being created for the execution. // If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled // event recorded in history, and a decision task being created for the execution func (h *handlerImpl) SignalWithStartWorkflowExecution( ctx context.Context, wrappedRequest *types.HistorySignalWithStartWorkflowExecutionRequest, ) (resp *types.StartWorkflowExecutionResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistorySignalWithStartWorkflowExecutionScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } signalWithStartRequest := wrappedRequest.SignalWithStartRequest workflowID := signalWithStartRequest.GetWorkflowID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, "") } resp, err2 := engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) if err2 == nil { return resp, nil } // Two simultaneous SignalWithStart requests might try to start a workflow at the same time. // This can result in one of the requests failing with one of two possible errors: // 1) If it is a brand new WF ID, one of the requests can fail with WorkflowExecutionAlreadyStartedError // (createMode is persistence.CreateWorkflowModeBrandNew) // 2) If it an already existing WF ID, one of the requests can fail with a CurrentWorkflowConditionFailedError // (createMode is persisetence.CreateWorkflowModeWorkflowIDReuse) // If either error occurs, just go ahead and retry. It should succeed on the subsequent attempt. var e1 *persistence.WorkflowExecutionAlreadyStartedError var e2 *persistence.CurrentWorkflowConditionFailedError if !errors.As(err2, &e1) && !errors.As(err2, &e2) { return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) } resp, err2 = engine.SignalWithStartWorkflowExecution(ctx, wrappedRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, resp.GetRunID()) } return resp, nil } // RemoveSignalMutableState is used to remove a signal request ID that was previously recorded. This is currently // used to clean execution info when signal decision finished. func (h *handlerImpl) RemoveSignalMutableState( ctx context.Context, wrappedRequest *types.RemoveSignalMutableStateRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRemoveSignalMutableStateScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RemoveSignalMutableState(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event // in the history and immediately terminating the execution instance. func (h *handlerImpl) TerminateWorkflowExecution( ctx context.Context, wrappedRequest *types.HistoryTerminateWorkflowExecutionRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryTerminateWorkflowExecutionScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.TerminateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.TerminateWorkflowExecution(ctx, wrappedRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // ResetWorkflowExecution reset an existing workflow execution // in the history and immediately terminating the execution instance. func (h *handlerImpl) ResetWorkflowExecution( ctx context.Context, wrappedRequest *types.HistoryResetWorkflowExecutionRequest, ) (resp *types.ResetWorkflowExecutionResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetWorkflowExecutionScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } domainID := wrappedRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := wrappedRequest.ResetRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.ResetWorkflowExecution(ctx, wrappedRequest) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // QueryWorkflow queries a types. func (h *handlerImpl) QueryWorkflow( ctx context.Context, request *types.HistoryQueryWorkflowRequest, ) (resp *types.HistoryQueryWorkflowResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryQueryWorkflowScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowID := request.GetRequest().GetExecution().GetWorkflowID() runID := request.GetRequest().GetExecution().GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return nil, h.error(err1, scope, domainID, workflowID, runID) } resp, err2 := engine.QueryWorkflow(ctx, request) if err2 != nil { return nil, h.error(err2, scope, domainID, workflowID, runID) } return resp, nil } // ScheduleDecisionTask is used for creating a decision task for already started workflow execution. This is mainly // used by transfer queue processor during the processing of StartChildWorkflowExecution task, where it first starts // child execution without creating the decision task and then calls this API after updating the mutable state of // parent execution. func (h *handlerImpl) ScheduleDecisionTask( ctx context.Context, request *types.ScheduleDecisionTaskRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryScheduleDecisionTaskScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { return h.error(constants.ErrWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.ScheduleDecisionTask(ctx, request) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // RecordChildExecutionCompleted is used for reporting the completion of child workflow execution to parent. // This is mainly called by transfer queue processor during the processing of DeleteExecution task. func (h *handlerImpl) RecordChildExecutionCompleted( ctx context.Context, request *types.RecordChildExecutionCompletedRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRecordChildExecutionCompletedScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := request.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if request.WorkflowExecution == nil { return h.error(constants.ErrWorkflowExecutionNotSet, scope, domainID, "", "") } workflowExecution := request.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.RecordChildExecutionCompleted(ctx, request) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // ResetStickyTaskList reset the volatile information in mutable state of a given types. // Volatile information are the information related to client, such as: // 1. StickyTaskList // 2. StickyScheduleToStartTimeout // 3. ClientLibraryVersion // 4. ClientFeatureVersion // 5. ClientImpl func (h *handlerImpl) ResetStickyTaskList( ctx context.Context, resetRequest *types.HistoryResetStickyTaskListRequest, ) (resp *types.HistoryResetStickyTaskListResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryResetStickyTaskListScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } domainID := resetRequest.GetDomainUUID() if domainID == "" { return nil, h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return nil, h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowID := resetRequest.Execution.GetWorkflowID() runID := resetRequest.Execution.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { return nil, h.error(err, scope, domainID, workflowID, runID) } resp, err = engine.ResetStickyTaskList(ctx, resetRequest) if err != nil { return nil, h.error(err, scope, domainID, workflowID, runID) } return resp, nil } // ReplicateEventsV2 is called by processor to replicate history events for passive domains func (h *handlerImpl) ReplicateEventsV2( ctx context.Context, replicateRequest *types.ReplicateEventsV2Request, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() if h.isShuttingDown() { return constants.ErrShuttingDown } scope, sw := h.startRequestProfile(ctx, metrics.HistoryReplicateEventsV2Scope) defer sw.Stop() domainID := replicateRequest.GetDomainUUID() if domainID == "" { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } workflowExecution := replicateRequest.WorkflowExecution workflowID := workflowExecution.GetWorkflowID() runID := workflowExecution.GetRunID() engine, err1 := h.controller.GetEngine(workflowID) if err1 != nil { return h.error(err1, scope, domainID, workflowID, runID) } err2 := engine.ReplicateEventsV2(ctx, replicateRequest) if err2 != nil { return h.error(err2, scope, domainID, workflowID, runID) } return nil } // SyncShardStatus is called by processor to sync history shard information from another cluster func (h *handlerImpl) SyncShardStatus( ctx context.Context, syncShardStatusRequest *types.SyncShardStatusRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncShardStatusScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, "", "", "") } if syncShardStatusRequest.SourceCluster == "" { return h.error(constants.ErrSourceClusterNotSet, scope, "", "", "") } if syncShardStatusRequest.Timestamp == nil { return h.error(constants.ErrTimestampNotSet, scope, "", "", "") } // shard ID is already provided in the request engine, err := h.controller.GetEngineForShard(int(syncShardStatusRequest.GetShardID())) if err != nil { return h.error(err, scope, "", "", "") } err = engine.SyncShardStatus(ctx, syncShardStatusRequest) if err != nil { return h.error(err, scope, "", "", "") } return nil } // SyncActivity is called by processor to sync activity func (h *handlerImpl) SyncActivity( ctx context.Context, syncActivityRequest *types.SyncActivityRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistorySyncActivityScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := syncActivityRequest.GetDomainID() if syncActivityRequest.DomainID == "" || uuid.Parse(syncActivityRequest.GetDomainID()) == nil { return h.error(constants.ErrDomainNotSet, scope, domainID, "", "") } if ok := h.rateLimiter.Allow(); !ok { return h.error(constants.ErrHistoryHostThrottle, scope, domainID, "", "") } if syncActivityRequest.WorkflowID == "" { return h.error(constants.ErrWorkflowIDNotSet, scope, domainID, "", "") } if syncActivityRequest.RunID == "" || uuid.Parse(syncActivityRequest.GetRunID()) == nil { return h.error(constants.ErrRunIDNotValid, scope, domainID, "", "") } workflowID := syncActivityRequest.GetWorkflowID() runID := syncActivityRequest.GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } err = engine.SyncActivity(ctx, syncActivityRequest) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } return nil } // GetReplicationMessages is called by remote peers to get replicated messages for cross DC replication func (h *handlerImpl) GetReplicationMessages( ctx context.Context, request *types.GetReplicationMessagesRequest, ) (resp *types.GetReplicationMessagesResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() h.GetLogger().Debug("Received GetReplicationMessages call.") _, sw := h.startRequestProfile(ctx, metrics.HistoryGetReplicationMessagesScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } var wg sync.WaitGroup wg.Add(len(request.Tokens)) result := new(sync.Map) for _, token := range request.Tokens { go func(token *types.ReplicationToken) { defer wg.Done() engine, err := h.controller.GetEngineForShard(int(token.GetShardID())) if err != nil { h.GetLogger().Warn("History engine not found for shard", tag.Error(err)) return } tasks, err := engine.GetReplicationMessages( ctx, request.GetClusterName(), token.GetLastRetrievedMessageID(), ) if err != nil { h.GetLogger().Warn("Failed to get replication tasks for shard", tag.Error(err)) return } result.Store(token.GetShardID(), tasks) }(token) } wg.Wait() responseSize := 0 maxResponseSize := h.config.MaxResponseSize messagesByShard := make(map[int32]*types.ReplicationMessages) result.Range(func(key, value interface{}) bool { shardID := key.(int32) tasks := value.(*types.ReplicationMessages) size := proto.FromReplicationMessages(tasks).Size() if (responseSize + size) >= maxResponseSize { // Log shards that did not fit for debugging purposes h.GetLogger().Warn("Replication messages did not fit in the response (history host)", tag.ShardID(int(shardID)), tag.ResponseSize(size), tag.ResponseTotalSize(responseSize), tag.ResponseMaxSize(maxResponseSize), ) } else { responseSize += size messagesByShard[shardID] = tasks } return true }) h.GetLogger().Debug("GetReplicationMessages succeeded.") return &types.GetReplicationMessagesResponse{MessagesByShard: messagesByShard}, nil } // GetDLQReplicationMessages is called by remote peers to get replicated messages for DLQ merging func (h *handlerImpl) GetDLQReplicationMessages( ctx context.Context, request *types.GetDLQReplicationMessagesRequest, ) (resp *types.GetDLQReplicationMessagesResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() _, sw := h.startRequestProfile(ctx, metrics.HistoryGetDLQReplicationMessagesScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } taskInfoPerExecution := map[definition.WorkflowIdentifier][]*types.ReplicationTaskInfo{} // do batch based on workflow ID and run ID for _, taskInfo := range request.GetTaskInfos() { identity := definition.NewWorkflowIdentifier( taskInfo.GetDomainID(), taskInfo.GetWorkflowID(), taskInfo.GetRunID(), ) if _, ok := taskInfoPerExecution[identity]; !ok { taskInfoPerExecution[identity] = []*types.ReplicationTaskInfo{} } taskInfoPerExecution[identity] = append(taskInfoPerExecution[identity], taskInfo) } var wg sync.WaitGroup wg.Add(len(taskInfoPerExecution)) tasksChan := make(chan *types.ReplicationTask, len(request.GetTaskInfos())) handleTaskInfoPerExecution := func(taskInfos []*types.ReplicationTaskInfo) { defer wg.Done() if len(taskInfos) == 0 { return } engine, err := h.controller.GetEngine( taskInfos[0].GetWorkflowID(), ) if err != nil { h.GetLogger().Warn("History engine not found for workflow ID.", tag.Error(err)) return } tasks, err := engine.GetDLQReplicationMessages( ctx, taskInfos, ) if err != nil { h.GetLogger().Error("Failed to get dlq replication tasks.", tag.Error(err)) return } for _, task := range tasks { tasksChan <- task } } for _, replicationTaskInfos := range taskInfoPerExecution { go handleTaskInfoPerExecution(replicationTaskInfos) } wg.Wait() close(tasksChan) replicationTasks := make([]*types.ReplicationTask, 0, len(tasksChan)) for task := range tasksChan { replicationTasks = append(replicationTasks, task) } return &types.GetDLQReplicationMessagesResponse{ ReplicationTasks: replicationTasks, }, nil } // ReapplyEvents applies stale events to the current workflow and the current run func (h *handlerImpl) ReapplyEvents( ctx context.Context, request *types.HistoryReapplyEventsRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryReapplyEventsScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := request.GetDomainUUID() workflowID := request.GetRequest().GetWorkflowExecution().GetWorkflowID() runID := request.GetRequest().GetWorkflowExecution().GetRunID() engine, err := h.controller.GetEngine(workflowID) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } // deserialize history event object historyEvents, err := h.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{ Encoding: common.EncodingTypeThriftRW, Data: request.GetRequest().GetEvents().GetData(), }) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } execution := request.GetRequest().GetWorkflowExecution() if err := engine.ReapplyEvents( ctx, request.GetDomainUUID(), execution.GetWorkflowID(), execution.GetRunID(), historyEvents, ); err != nil { return h.error(err, scope, domainID, workflowID, runID) } return nil } func (h *handlerImpl) CountDLQMessages( ctx context.Context, request *types.CountDLQMessagesRequest, ) (resp *types.HistoryCountDLQMessagesResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryCountDLQMessagesScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } g := &errgroup.Group{} var mu sync.Mutex entries := map[types.HistoryDLQCountKey]int64{} for _, shardID := range h.controller.ShardIDs() { shardID := shardID g.Go(func() (e error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &e) }() engine, err := h.controller.GetEngineForShard(int(shardID)) if err != nil { return fmt.Errorf("dlq count for shard %d: %w", shardID, err) } counts, err := engine.CountDLQMessages(ctx, request.ForceFetch) if err != nil { return fmt.Errorf("dlq count for shard %d: %w", shardID, err) } mu.Lock() defer mu.Unlock() for sourceCluster, count := range counts { key := types.HistoryDLQCountKey{ShardID: shardID, SourceCluster: sourceCluster} entries[key] = count } return nil }) } err := g.Wait() return &types.HistoryCountDLQMessagesResponse{Entries: entries}, h.error(err, scope, "", "", "") } // ReadDLQMessages reads replication DLQ messages func (h *handlerImpl) ReadDLQMessages( ctx context.Context, request *types.ReadDLQMessagesRequest, ) (resp *types.ReadDLQMessagesResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryReadDLQMessagesScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return nil, h.error(err, scope, "", "", "") } return engine.ReadDLQMessages(ctx, request) } // PurgeDLQMessages deletes replication DLQ messages func (h *handlerImpl) PurgeDLQMessages( ctx context.Context, request *types.PurgeDLQMessagesRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryPurgeDLQMessagesScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return h.error(err, scope, "", "", "") } return engine.PurgeDLQMessages(ctx, request) } // MergeDLQMessages reads and applies replication DLQ messages func (h *handlerImpl) MergeDLQMessages( ctx context.Context, request *types.MergeDLQMessagesRequest, ) (resp *types.MergeDLQMessagesResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } scope, sw := h.startRequestProfile(ctx, metrics.HistoryMergeDLQMessagesScope) defer sw.Stop() engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return nil, h.error(err, scope, "", "", "") } return engine.MergeDLQMessages(ctx, request) } // RefreshWorkflowTasks refreshes all the tasks of a workflow func (h *handlerImpl) RefreshWorkflowTasks( ctx context.Context, request *types.HistoryRefreshWorkflowTasksRequest) (retError error) { scope, sw := h.startRequestProfile(ctx, metrics.HistoryRefreshWorkflowTasksScope) defer sw.Stop() if h.isShuttingDown() { return constants.ErrShuttingDown } domainID := request.DomainUIID execution := request.GetRequest().GetExecution() workflowID := execution.GetWorkflowID() runID := execution.GetWorkflowID() engine, err := h.controller.GetEngine(workflowID) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } err = engine.RefreshWorkflowTasks( ctx, domainID, types.WorkflowExecution{ WorkflowID: execution.WorkflowID, RunID: execution.RunID, }, ) if err != nil { return h.error(err, scope, domainID, workflowID, runID) } return nil } // NotifyFailoverMarkers sends the failover markers to failover coordinator. // The coordinator decides when the failover finishes based on received failover marker. func (h *handlerImpl) NotifyFailoverMarkers( ctx context.Context, request *types.NotifyFailoverMarkersRequest, ) (retError error) { _, sw := h.startRequestProfile(ctx, metrics.HistoryNotifyFailoverMarkersScope) defer sw.Stop() for _, token := range request.GetFailoverMarkerTokens() { marker := token.GetFailoverMarker() h.GetLogger().Debug("Handling failover maker", tag.WorkflowDomainID(marker.GetDomainID())) h.failoverCoordinator.ReceiveFailoverMarkers(token.GetShardIDs(), token.GetFailoverMarker()) } return nil } func (h *handlerImpl) GetCrossClusterTasks( ctx context.Context, request *types.GetCrossClusterTasksRequest, ) (resp *types.GetCrossClusterTasksResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() _, sw := h.startRequestProfile(ctx, metrics.HistoryGetCrossClusterTasksScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } ctx, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() futureByShardID := make(map[int32]future.Future, len(request.ShardIDs)) for _, shardID := range request.ShardIDs { future, settable := future.NewFuture() futureByShardID[shardID] = future go func(shardID int32) { logger := h.GetLogger().WithTags(tag.ShardID(int(shardID))) engine, err := h.controller.GetEngineForShard(int(shardID)) if err != nil { logger.Error("History engine not found for shard", tag.Error(err)) var owner membership.HostInfo if info, err := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(int(shardID))); err == nil { owner = info } settable.Set(nil, shard.CreateShardOwnershipLostError(h.GetHostInfo(), owner)) return } if tasks, err := engine.GetCrossClusterTasks(ctx, request.TargetCluster); err != nil { logger.Error("Failed to get cross cluster tasks", tag.Error(err)) settable.Set(nil, h.convertError(err)) } else { settable.Set(tasks, nil) } }(shardID) } response := &types.GetCrossClusterTasksResponse{ TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest), FailedCauseByShard: make(map[int32]types.GetTaskFailedCause), } for shardID, future := range futureByShardID { var taskRequests []*types.CrossClusterTaskRequest if futureErr := future.Get(ctx, &taskRequests); futureErr != nil { response.FailedCauseByShard[shardID] = common.ConvertErrToGetTaskFailedCause(futureErr) } else { response.TasksByShard[shardID] = taskRequests } } // not using a waitGroup for created goroutines here // as once all futures are unblocked, // those goroutines will eventually be completed return response, nil } func (h *handlerImpl) RespondCrossClusterTasksCompleted( ctx context.Context, request *types.RespondCrossClusterTasksCompletedRequest, ) (resp *types.RespondCrossClusterTasksCompletedResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryRespondCrossClusterTasksCompletedScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } engine, err := h.controller.GetEngineForShard(int(request.GetShardID())) if err != nil { return nil, h.error(err, scope, "", "", "") } err = engine.RespondCrossClusterTasksCompleted(ctx, request.TargetCluster, request.TaskResponses) if err != nil { return nil, h.error(err, scope, "", "", "") } response := &types.RespondCrossClusterTasksCompletedResponse{} if request.FetchNewTasks { fetchTaskCtx, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() response.Tasks, err = engine.GetCrossClusterTasks(fetchTaskCtx, request.TargetCluster) if err != nil { return nil, h.error(err, scope, "", "", "") } } return response, nil } func (h *handlerImpl) GetFailoverInfo( ctx context.Context, request *types.GetFailoverInfoRequest, ) (resp *types.GetFailoverInfoResponse, retError error) { defer func() { log.CapturePanic(recover(), h.GetLogger(), &retError) }() h.startWG.Wait() scope, sw := h.startRequestProfile(ctx, metrics.HistoryGetFailoverInfoScope) defer sw.Stop() if h.isShuttingDown() { return nil, constants.ErrShuttingDown } resp, err := h.failoverCoordinator.GetFailoverInfo(request.GetDomainID()) if err != nil { return nil, h.error(err, scope, request.GetDomainID(), "", "") } return resp, nil } // convertError is a helper method to convert ShardOwnershipLostError from persistence layer returned by various // HistoryEngine API calls to ShardOwnershipLost error return by HistoryService for client to be redirected to the // correct shard. func (h *handlerImpl) convertError(err error) error { switch err := err.(type) { case *persistence.ShardOwnershipLostError: info, err2 := h.GetMembershipResolver().Lookup(service.History, strconv.Itoa(err.ShardID)) if err2 != nil { return shard.CreateShardOwnershipLostError(h.GetHostInfo(), membership.HostInfo{}) } return shard.CreateShardOwnershipLostError(h.GetHostInfo(), info) case *persistence.WorkflowExecutionAlreadyStartedError: return &types.InternalServiceError{Message: err.Msg} case *persistence.CurrentWorkflowConditionFailedError: return &types.InternalServiceError{Message: err.Msg} case *persistence.TransactionSizeLimitError: return &types.BadRequestError{Message: err.Msg} } return err } func (h *handlerImpl) updateErrorMetric( scope metrics.Scope, domainID string, workflowID string, runID string, err error, ) { var yarpcE *yarpcerrors.Status var shardOwnershipLostError *types.ShardOwnershipLostError var eventAlreadyStartedError *types.EventAlreadyStartedError var badRequestError *types.BadRequestError var domainNotActiveError *types.DomainNotActiveError var workflowExecutionAlreadyStartedError *types.WorkflowExecutionAlreadyStartedError var entityNotExistsError *types.EntityNotExistsError var workflowExecutionAlreadyCompletedError *types.WorkflowExecutionAlreadyCompletedError var cancellationAlreadyRequestedError *types.CancellationAlreadyRequestedError var limitExceededError *types.LimitExceededError var retryTaskV2Error *types.RetryTaskV2Error var serviceBusyError *types.ServiceBusyError var internalServiceError *types.InternalServiceError if err == context.DeadlineExceeded || err == context.Canceled { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) return } if errors.As(err, &shardOwnershipLostError) { scope.IncCounter(metrics.CadenceErrShardOwnershipLostCounter) } else if errors.As(err, &eventAlreadyStartedError) { scope.IncCounter(metrics.CadenceErrEventAlreadyStartedCounter) } else if errors.As(err, &badRequestError) { scope.IncCounter(metrics.CadenceErrBadRequestCounter) } else if errors.As(err, &domainNotActiveError) { scope.IncCounter(metrics.CadenceErrDomainNotActiveCounter) } else if errors.As(err, &workflowExecutionAlreadyStartedError) { scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter) } else if errors.As(err, &entityNotExistsError) { scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter) } else if errors.As(err, &workflowExecutionAlreadyCompletedError) { scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter) } else if errors.As(err, &cancellationAlreadyRequestedError) { scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter) } else if errors.As(err, &limitExceededError) { scope.IncCounter(metrics.CadenceErrLimitExceededCounter) } else if errors.As(err, &retryTaskV2Error) { scope.IncCounter(metrics.CadenceErrRetryTaskCounter) } else if errors.As(err, &serviceBusyError) { scope.IncCounter(metrics.CadenceErrServiceBusyCounter) } else if errors.As(err, &yarpcE) { if yarpcE.Code() == yarpcerrors.CodeDeadlineExceeded { scope.IncCounter(metrics.CadenceErrContextTimeoutCounter) } scope.IncCounter(metrics.CadenceFailures) } else if errors.As(err, &internalServiceError) { scope.IncCounter(metrics.CadenceFailures) h.GetLogger().Error("Internal service error", tag.Error(err), tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), tag.WorkflowDomainID(domainID)) } else { // Default / unknown error fallback scope.IncCounter(metrics.CadenceFailures) h.GetLogger().Error("Uncategorized error", tag.Error(err), tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), tag.WorkflowDomainID(domainID)) } } func (h *handlerImpl) error( err error, scope metrics.Scope, domainID string, workflowID string, runID string, ) error { err = h.convertError(err) h.updateErrorMetric(scope, domainID, workflowID, runID, err) return err } func (h *handlerImpl) emitInfoOrDebugLog( domainID string, msg string, tags ...tag.Tag, ) { if h.config.EnableDebugMode && h.config.EnableTaskInfoLogByDomainID(domainID) { h.GetLogger().Info(msg, tags...) } else { h.GetLogger().Debug(msg, tags...) } } func (h *handlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) { metricsScope := h.GetMetricsClient().Scope(scope, metrics.GetContextTags(ctx)...) metricsScope.IncCounter(metrics.CadenceRequests) sw := metricsScope.StartTimer(metrics.CadenceLatency) return metricsScope, sw } func validateTaskToken(token *common.TaskToken) error { if token.WorkflowID == "" { return constants.ErrWorkflowIDNotSet } if token.RunID != "" && uuid.Parse(token.RunID) == nil { return constants.ErrRunIDNotValid } return nil }