service/matching/matchingEngine.go (928 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package matching import ( "bytes" "context" "errors" "fmt" "math" "sync" "time" "github.com/pborman/uuid" "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/membership" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/partition" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" ) // If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable // This seems aggressive, but the default sticky schedule_to_start timeout is 5s, so 10s seems reasonable. const _stickyPollerUnavailableWindow = 10 * time.Second // Implements matching.Engine // TODO: Switch implementation from lock/channel based to a partitioned agent // to simplify code and reduce possibility of synchronization errors. type ( pollerIDCtxKey string identityCtxKey string isolationGroupCtxKey string queryResult struct { workerResponse *types.MatchingRespondQueryTaskCompletedRequest internalError error } // lockableQueryTaskMap maps query TaskID (which is a UUID generated in QueryWorkflow() call) to a channel // that QueryWorkflow() will block on. The channel is unblocked either by worker sending response through // RespondQueryTaskCompleted() or through an internal service error causing cadence to be unable to dispatch // query task to workflow worker. lockableQueryTaskMap struct { sync.RWMutex queryTaskMap map[string]chan *queryResult } matchingEngineImpl struct { taskManager persistence.TaskManager clusterMetadata cluster.Metadata historyService history.Client matchingClient matching.Client tokenSerializer common.TaskTokenSerializer logger log.Logger metricsClient metrics.Client taskListsLock sync.RWMutex // locks mutation of taskLists taskLists map[taskListID]taskListManager // Convert to LRU cache config *Config lockableQueryTaskMap lockableQueryTaskMap domainCache cache.DomainCache versionChecker client.VersionChecker membershipResolver membership.Resolver partitioner partition.Partitioner } // HistoryInfo consists of two integer regarding the history size and history count // HistoryInfo struct { // historySize int64 // historyCount int64 // } ) var ( // EmptyPollForDecisionTaskResponse is the response when there are no decision tasks to hand out emptyPollForDecisionTaskResponse = &types.MatchingPollForDecisionTaskResponse{} // EmptyPollForActivityTaskResponse is the response when there are no activity tasks to hand out emptyPollForActivityTaskResponse = &types.PollForActivityTaskResponse{} persistenceOperationRetryPolicy = common.CreatePersistenceRetryPolicy() historyServiceOperationRetryPolicy = common.CreateHistoryServiceRetryPolicy() // ErrNoTasks is exported temporarily for integration test ErrNoTasks = errors.New("no tasks") errPumpClosed = errors.New("task list pump closed its channel") pollerIDKey pollerIDCtxKey = "pollerID" identityKey identityCtxKey = "identity" _isolationGroupKey isolationGroupCtxKey = "isolationGroup" _stickyPollerUnavailableError = &types.StickyWorkerUnavailableError{Message: "sticky worker is unavailable, please use non-sticky task list."} ) var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented // NewEngine creates an instance of matching engine func NewEngine(taskManager persistence.TaskManager, clusterMetadata cluster.Metadata, historyService history.Client, matchingClient matching.Client, config *Config, logger log.Logger, metricsClient metrics.Client, domainCache cache.DomainCache, resolver membership.Resolver, partitioner partition.Partitioner, ) Engine { return &matchingEngineImpl{ taskManager: taskManager, clusterMetadata: clusterMetadata, historyService: historyService, tokenSerializer: common.NewJSONTaskTokenSerializer(), taskLists: make(map[taskListID]taskListManager), logger: logger.WithTags(tag.ComponentMatchingEngine), metricsClient: metricsClient, matchingClient: matchingClient, config: config, lockableQueryTaskMap: lockableQueryTaskMap{queryTaskMap: make(map[string]chan *queryResult)}, domainCache: domainCache, versionChecker: client.NewVersionChecker(), membershipResolver: resolver, partitioner: partitioner, } } func (e *matchingEngineImpl) Start() { // As task lists are initialized lazily nothing is done on startup at this point. } func (e *matchingEngineImpl) Stop() { // Executes Stop() on each task list outside of lock for _, l := range e.getTaskLists(math.MaxInt32) { l.Stop() } } func (e *matchingEngineImpl) getTaskLists(maxCount int) []taskListManager { e.taskListsLock.RLock() defer e.taskListsLock.RUnlock() lists := make([]taskListManager, 0, len(e.taskLists)) count := 0 for _, tlMgr := range e.taskLists { lists = append(lists, tlMgr) count++ if count >= maxCount { break } } return lists } func (e *matchingEngineImpl) String() string { // Executes taskList.String() on each task list outside of lock buf := new(bytes.Buffer) for _, l := range e.getTaskLists(1000) { fmt.Fprintf(buf, "\n%s", l.String()) } return buf.String() } // Returns taskListManager for a task list. If not already cached gets new range from DB and // if successful creates one. func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID, taskListKind *types.TaskListKind) (taskListManager, error) { // The first check is an optimization so almost all requests will have a task list manager // and return avoiding the write lock e.taskListsLock.RLock() if result, ok := e.taskLists[*taskList]; ok { e.taskListsLock.RUnlock() return result, nil } e.taskListsLock.RUnlock() // If it gets here, write lock and check again in case a task list is created between the two locks e.taskListsLock.Lock() if result, ok := e.taskLists[*taskList]; ok { e.taskListsLock.Unlock() return result, nil } // common tagged logger logger := e.logger.WithTags( tag.WorkflowTaskListName(taskList.name), tag.WorkflowTaskListType(taskList.taskType), tag.WorkflowDomainID(taskList.domainID), ) logger.Info("Task list manager state changed", tag.LifeCycleStarting) mgr, err := newTaskListManager(e, taskList, taskListKind, e.config, time.Now()) if err != nil { e.taskListsLock.Unlock() logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err)) return nil, err } e.taskLists[*taskList] = mgr e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge( metrics.TaskListManagersGauge, float64(len(e.taskLists)), ) e.taskListsLock.Unlock() err = mgr.Start() if err != nil { logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err)) return nil, err } logger.Info("Task list manager state changed", tag.LifeCycleStarted) return mgr, nil } func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string) *types.GetTaskListsByDomainResponse { decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse) activityTaskListMap := make(map[string]*types.DescribeTaskListResponse) for tl, tlm := range e.taskLists { if tlm.GetTaskListKind() == types.TaskListKindNormal && tl.domainID == domainID { if types.TaskListType(tl.taskType) == types.TaskListTypeDecision { decisionTaskListMap[tl.baseName] = tlm.DescribeTaskList(false) } activityTaskListMap[tl.baseName] = tlm.DescribeTaskList(false) } } return &types.GetTaskListsByDomainResponse{ DecisionTaskListMap: decisionTaskListMap, ActivityTaskListMap: activityTaskListMap, } } // For use in tests func (e *matchingEngineImpl) updateTaskList(taskList *taskListID, mgr taskListManager) { e.taskListsLock.Lock() defer e.taskListsLock.Unlock() e.taskLists[*taskList] = mgr } func (e *matchingEngineImpl) removeTaskListManager(tlMgr taskListManager) { id := tlMgr.TaskListID() e.taskListsLock.Lock() defer e.taskListsLock.Unlock() currentTlMgr, ok := e.taskLists[*id] if ok && tlMgr == currentTlMgr { delete(e.taskLists, *id) } e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge( metrics.TaskListManagersGauge, float64(len(e.taskLists)), ) } // AddDecisionTask either delivers task directly to waiting poller or save it into task list persistence. func (e *matchingEngineImpl) AddDecisionTask( hCtx *handlerContext, request *types.AddDecisionTaskRequest, ) (bool, error) { domainID := request.GetDomainUUID() taskListName := request.GetTaskList().GetName() taskListKind := request.GetTaskList().Kind taskListType := persistence.TaskListTypeDecision e.emitInfoOrDebugLog( domainID, "Received AddDecisionTask", tag.WorkflowTaskListName(request.TaskList.GetName()), tag.WorkflowID(request.Execution.GetWorkflowID()), tag.WorkflowRunID(request.Execution.GetRunID()), tag.WorkflowDomainID(domainID), tag.WorkflowTaskListType(taskListType), tag.WorkflowScheduleID(request.GetScheduleID()), tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())), ) taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return false, err } // get the domainName domainName, err := e.domainCache.GetDomainName(domainID) if err != nil { return false, err } // Only emit traffic metrics if the tasklist is not sticky and is not forwarded if int32(request.GetTaskList().GetKind()) == 0 && request.ForwardedFrom == "" { e.metricsClient.Scope(metrics.MatchingAddTaskScope).Tagged(metrics.DomainTag(domainName), metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("decision_task"), metrics.MatchingHostTag(e.config.HostName)).IncCounter(metrics.CadenceTasklistRequests) e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on decision task", tag.Dynamic("tasklistName", taskListName), tag.Dynamic("taskListBaseName", taskList.baseName)) } tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return false, err } if taskListKind != nil && *taskListKind == types.TaskListKindSticky { // check if the sticky worker is still available, if not, fail this request early if !tlMgr.HasPollerAfter(time.Now().Add(-_stickyPollerUnavailableWindow)) { return false, _stickyPollerUnavailableError } } taskInfo := &persistence.TaskInfo{ DomainID: domainID, RunID: request.Execution.GetRunID(), WorkflowID: request.Execution.GetWorkflowID(), ScheduleID: request.GetScheduleID(), ScheduleToStartTimeout: request.GetScheduleToStartTimeoutSeconds(), CreatedTime: time.Now(), PartitionConfig: request.GetPartitionConfig(), } return tlMgr.AddTask(hCtx.Context, addTaskParams{ execution: request.Execution, taskInfo: taskInfo, source: request.GetSource(), forwardedFrom: request.GetForwardedFrom(), }) } // AddActivityTask either delivers task directly to waiting poller or save it into task list persistence. func (e *matchingEngineImpl) AddActivityTask( hCtx *handlerContext, request *types.AddActivityTaskRequest, ) (bool, error) { domainID := request.GetDomainUUID() taskListName := request.GetTaskList().GetName() taskListKind := request.GetTaskList().Kind taskListType := persistence.TaskListTypeActivity e.emitInfoOrDebugLog( domainID, "Received AddActivityTask", tag.WorkflowTaskListName(taskListName), tag.WorkflowID(request.Execution.GetWorkflowID()), tag.WorkflowRunID(request.Execution.GetRunID()), tag.WorkflowDomainID(domainID), tag.WorkflowTaskListType(taskListType), tag.WorkflowScheduleID(request.GetScheduleID()), tag.WorkflowTaskListKind(int32(request.GetTaskList().GetKind())), ) taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return false, err } // get the domainName domainName, err := e.domainCache.GetDomainName(domainID) if err != nil { return false, err } // Only emit traffic metrics if the tasklist is not sticky and is not forwarded if int32(request.GetTaskList().GetKind()) == 0 && request.ForwardedFrom == "" { e.metricsClient.Scope(metrics.MatchingAddTaskScope).Tagged(metrics.DomainTag(domainName), metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("activity_task"), metrics.MatchingHostTag(e.config.HostName)).IncCounter(metrics.CadenceTasklistRequests) e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on activity task", tag.Dynamic("tasklistName", taskListName), tag.Dynamic("taskListBaseName", taskList.baseName)) } tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return false, err } taskInfo := &persistence.TaskInfo{ DomainID: request.GetSourceDomainUUID(), RunID: request.Execution.GetRunID(), WorkflowID: request.Execution.GetWorkflowID(), ScheduleID: request.GetScheduleID(), ScheduleToStartTimeout: request.GetScheduleToStartTimeoutSeconds(), CreatedTime: time.Now(), PartitionConfig: request.GetPartitionConfig(), } return tlMgr.AddTask(hCtx.Context, addTaskParams{ execution: request.Execution, taskInfo: taskInfo, source: request.GetSource(), forwardedFrom: request.GetForwardedFrom(), activityTaskDispatchInfo: request.ActivityTaskDispatchInfo, }) } // PollForDecisionTask tries to get the decision task using exponential backoff. func (e *matchingEngineImpl) PollForDecisionTask( hCtx *handlerContext, req *types.MatchingPollForDecisionTaskRequest, ) (*types.MatchingPollForDecisionTaskResponse, error) { domainID := req.GetDomainUUID() pollerID := req.GetPollerID() request := req.PollRequest taskListName := request.GetTaskList().GetName() taskListKind := request.GetTaskList().Kind e.logger.Debug("Received PollForDecisionTask for taskList", tag.WorkflowTaskListName(taskListName), tag.WorkflowDomainID(domainID), ) pollLoop: for { if err := common.IsValidContext(hCtx.Context); err != nil { return nil, err } taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) if err != nil { return nil, fmt.Errorf("couldn't create new decision tasklist %w", err) } // Add frontend generated pollerID to context so tasklistMgr can support cancellation of // long-poll when frontend calls CancelOutstandingPoll API pollerCtx := context.WithValue(hCtx.Context, pollerIDKey, pollerID) pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity()) pollerCtx = context.WithValue(pollerCtx, _isolationGroupKey, req.GetIsolationGroup()) task, err := e.getTask(pollerCtx, taskList, nil, taskListKind) if err != nil { // TODO: Is empty poll the best reply for errPumpClosed? if errors.Is(err, ErrNoTasks) || errors.Is(err, errPumpClosed) { e.logger.Debug("no decision tasks", tag.WorkflowTaskListName(taskListName), tag.WorkflowDomainID(domainID), tag.Error(err), ) return emptyPollForDecisionTaskResponse, nil } return nil, fmt.Errorf("couldn't get task: %w", err) } e.emitForwardedFromStats(hCtx.scope, task.isForwarded(), req.GetForwardedFrom()) if task.isStarted() { return task.pollForDecisionResponse(), nil // TODO: Maybe add history expose here? } if task.isQuery() { task.finish(nil) // this only means query task sync match succeed. // for query task, we don't need to update history to record decision task started. but we need to know // the NextEventID so front end knows what are the history events to load for this decision task. mutableStateResp, err := e.historyService.GetMutableState(hCtx.Context, &types.GetMutableStateRequest{ DomainUUID: req.DomainUUID, Execution: task.workflowExecution(), }) if err != nil { // will notify query client that the query task failed e.deliverQueryResult(task.query.taskID, &queryResult{internalError: err}) //nolint:errcheck return emptyPollForDecisionTaskResponse, nil } isStickyEnabled := false supportsSticky := client.NewVersionChecker().SupportsStickyQuery(mutableStateResp.GetClientImpl(), mutableStateResp.GetClientFeatureVersion()) == nil if len(mutableStateResp.StickyTaskList.GetName()) != 0 && supportsSticky { isStickyEnabled = true } resp := &types.RecordDecisionTaskStartedResponse{ PreviousStartedEventID: mutableStateResp.PreviousStartedEventID, NextEventID: mutableStateResp.NextEventID, WorkflowType: mutableStateResp.WorkflowType, StickyExecutionEnabled: isStickyEnabled, WorkflowExecutionTaskList: mutableStateResp.TaskList, BranchToken: mutableStateResp.CurrentBranchToken, HistorySize: mutableStateResp.HistorySize, } return e.createPollForDecisionTaskResponse(task, resp, hCtx.scope), nil } e.emitTaskIsolationMetrics(hCtx.scope, task.event.PartitionConfig, req.GetIsolationGroup()) resp, err := e.recordDecisionTaskStarted(hCtx.Context, request, task) if err != nil { switch err.(type) { case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError, *types.EventAlreadyStartedError: e.emitInfoOrDebugLog( task.event.DomainID, "Duplicated decision task", tag.WorkflowDomainID(domainID), tag.WorkflowID(task.event.WorkflowID), tag.WorkflowRunID(task.event.RunID), tag.WorkflowTaskListName(taskListName), tag.WorkflowScheduleID(task.event.ScheduleID), tag.TaskID(task.event.TaskID), tag.Error(err), ) task.finish(nil) default: e.emitInfoOrDebugLog( task.event.DomainID, "unknown error recording task started", tag.WorkflowDomainID(domainID), tag.Error(err), tag.WorkflowTaskListName(taskListName), ) task.finish(err) } continue pollLoop } task.finish(nil) return e.createPollForDecisionTaskResponse(task, resp, hCtx.scope), nil } } // pollForActivityTaskOperation takes one task from the task manager, update workflow execution history, mark task as // completed and return it to user. If a task from task manager is already started, return an empty response, without // error. Timeouts handled by the timer queue. func (e *matchingEngineImpl) PollForActivityTask( hCtx *handlerContext, req *types.MatchingPollForActivityTaskRequest, ) (*types.PollForActivityTaskResponse, error) { domainID := req.GetDomainUUID() pollerID := req.GetPollerID() request := req.PollRequest taskListName := request.GetTaskList().GetName() e.logger.Debug("Received PollForActivityTask", tag.WorkflowTaskListName(taskListName), tag.WorkflowDomainID(domainID), ) pollLoop: for { err := common.IsValidContext(hCtx.Context) if err != nil { return nil, err } taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeActivity) if err != nil { return nil, err } var maxDispatch *float64 if request.TaskListMetadata != nil { maxDispatch = request.TaskListMetadata.MaxTasksPerSecond } // Add frontend generated pollerID to context so tasklistMgr can support cancellation of // long-poll when frontend calls CancelOutstandingPoll API pollerCtx := context.WithValue(hCtx.Context, pollerIDKey, pollerID) pollerCtx = context.WithValue(pollerCtx, identityKey, request.GetIdentity()) pollerCtx = context.WithValue(pollerCtx, _isolationGroupKey, req.GetIsolationGroup()) taskListKind := request.TaskList.Kind task, err := e.getTask(pollerCtx, taskList, maxDispatch, taskListKind) if err != nil { // TODO: Is empty poll the best reply for errPumpClosed? if errors.Is(err, ErrNoTasks) || errors.Is(err, errPumpClosed) { return emptyPollForActivityTaskResponse, nil } e.logger.Error("Received unexpected err while getting task", tag.WorkflowTaskListName(taskListName), tag.WorkflowDomainID(domainID), tag.Error(err), ) return nil, err } e.emitForwardedFromStats(hCtx.scope, task.isForwarded(), req.GetForwardedFrom()) if task.isStarted() { // tasks received from remote are already started. So, simply forward the response return task.pollForActivityResponse(), nil } if task.activityTaskDispatchInfo != nil { task.finish(nil) return e.createSyncMatchPollForActivityTaskResponse(task, task.activityTaskDispatchInfo), nil } e.emitTaskIsolationMetrics(hCtx.scope, task.event.PartitionConfig, req.GetIsolationGroup()) resp, err := e.recordActivityTaskStarted(hCtx.Context, request, task) if err != nil { switch err.(type) { case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError, *types.EventAlreadyStartedError: e.emitInfoOrDebugLog( task.event.DomainID, "Duplicated activity task", tag.WorkflowDomainID(domainID), tag.WorkflowID(task.event.WorkflowID), tag.WorkflowRunID(task.event.RunID), tag.WorkflowTaskListName(taskListName), tag.WorkflowScheduleID(task.event.ScheduleID), tag.TaskID(task.event.TaskID), ) task.finish(nil) default: task.finish(err) } continue pollLoop } task.finish(nil) return e.createPollForActivityTaskResponse(task, resp, hCtx.scope), nil } } func (e *matchingEngineImpl) createSyncMatchPollForActivityTaskResponse( task *InternalTask, activityTaskDispatchInfo *types.ActivityTaskDispatchInfo, ) *types.PollForActivityTaskResponse { scheduledEvent := activityTaskDispatchInfo.ScheduledEvent attributes := scheduledEvent.ActivityTaskScheduledEventAttributes response := &types.PollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header response.Input = attributes.Input response.WorkflowExecution = task.workflowExecution() response.ScheduledTimestampOfThisAttempt = activityTaskDispatchInfo.ScheduledTimestampOfThisAttempt response.ScheduledTimestamp = scheduledEvent.Timestamp response.ScheduleToCloseTimeoutSeconds = attributes.ScheduleToCloseTimeoutSeconds response.StartedTimestamp = activityTaskDispatchInfo.StartedTimestamp response.StartToCloseTimeoutSeconds = attributes.StartToCloseTimeoutSeconds response.HeartbeatTimeoutSeconds = attributes.HeartbeatTimeoutSeconds token := &common.TaskToken{ DomainID: task.event.DomainID, WorkflowID: task.event.WorkflowID, WorkflowType: activityTaskDispatchInfo.WorkflowType.GetName(), RunID: task.event.RunID, ScheduleID: task.event.ScheduleID, ScheduleAttempt: common.Int64Default(activityTaskDispatchInfo.Attempt), ActivityID: attributes.GetActivityID(), ActivityType: attributes.GetActivityType().GetName(), } response.TaskToken, _ = e.tokenSerializer.Serialize(token) response.Attempt = int32(token.ScheduleAttempt) response.HeartbeatDetails = activityTaskDispatchInfo.HeartbeatDetails response.WorkflowType = activityTaskDispatchInfo.WorkflowType response.WorkflowDomain = activityTaskDispatchInfo.WorkflowDomain return response } // QueryWorkflow creates a DecisionTask with query data, send it through sync match channel, wait for that DecisionTask // to be processed by worker, and then return the query result. func (e *matchingEngineImpl) QueryWorkflow( hCtx *handlerContext, queryRequest *types.MatchingQueryWorkflowRequest, ) (*types.QueryWorkflowResponse, error) { domainID := queryRequest.GetDomainUUID() taskListName := queryRequest.GetTaskList().GetName() taskListKind := queryRequest.GetTaskList().Kind taskList, err := newTaskListID(domainID, taskListName, persistence.TaskListTypeDecision) if err != nil { return nil, err } tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return nil, err } if taskListKind != nil && *taskListKind == types.TaskListKindSticky { // check if the sticky worker is still available, if not, fail this request early if !tlMgr.HasPollerAfter(time.Now().Add(-_stickyPollerUnavailableWindow)) { return nil, _stickyPollerUnavailableError } } taskID := uuid.New() resp, err := tlMgr.DispatchQueryTask(hCtx.Context, taskID, queryRequest) // if get response or error it means that query task was handled by forwarding to another matching host // this remote host's result can be returned directly if resp != nil || err != nil { return resp, err } // if get here it means that dispatch of query task has occurred locally // must wait on result channel to get query result queryResultCh := make(chan *queryResult, 1) e.lockableQueryTaskMap.put(taskID, queryResultCh) defer e.lockableQueryTaskMap.delete(taskID) select { case result := <-queryResultCh: if result.internalError != nil { return nil, result.internalError } workerResponse := result.workerResponse // if query was intended as consistent query check to see if worker supports consistent query if queryRequest.GetQueryRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong { if err := e.versionChecker.SupportsConsistentQuery( workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetImpl(), workerResponse.GetCompletedRequest().GetWorkerVersionInfo().GetFeatureVersion()); err != nil { return nil, err } } switch workerResponse.GetCompletedRequest().GetCompletedType() { case types.QueryTaskCompletedTypeCompleted: return &types.QueryWorkflowResponse{QueryResult: workerResponse.GetCompletedRequest().GetQueryResult()}, nil case types.QueryTaskCompletedTypeFailed: return nil, &types.QueryFailedError{Message: workerResponse.GetCompletedRequest().GetErrorMessage()} default: return nil, &types.InternalServiceError{Message: "unknown query completed type"} } case <-hCtx.Done(): return nil, hCtx.Err() } } func (e *matchingEngineImpl) RespondQueryTaskCompleted(hCtx *handlerContext, request *types.MatchingRespondQueryTaskCompletedRequest) error { if err := e.deliverQueryResult(request.GetTaskID(), &queryResult{workerResponse: request}); err != nil { hCtx.scope.IncCounter(metrics.RespondQueryTaskFailedPerTaskListCounter) return err } return nil } func (e *matchingEngineImpl) deliverQueryResult(taskID string, queryResult *queryResult) error { queryResultCh, ok := e.lockableQueryTaskMap.get(taskID) if !ok { return &types.InternalServiceError{Message: "query task not found, or already expired"} } queryResultCh <- queryResult return nil } func (e *matchingEngineImpl) CancelOutstandingPoll( hCtx *handlerContext, request *types.CancelOutstandingPollRequest, ) error { domainID := request.GetDomainUUID() taskListType := int(request.GetTaskListType()) taskListName := request.GetTaskList().GetName() taskListKind := request.GetTaskList().Kind pollerID := request.GetPollerID() taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return err } tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return err } tlMgr.CancelPoller(pollerID) return nil } func (e *matchingEngineImpl) DescribeTaskList( hCtx *handlerContext, request *types.MatchingDescribeTaskListRequest, ) (*types.DescribeTaskListResponse, error) { domainID := request.GetDomainUUID() taskListType := persistence.TaskListTypeDecision if request.DescRequest.GetTaskListType() == types.TaskListTypeActivity { taskListType = persistence.TaskListTypeActivity } taskListName := request.GetDescRequest().GetTaskList().GetName() taskListKind := request.GetDescRequest().GetTaskList().Kind taskList, err := newTaskListID(domainID, taskListName, taskListType) if err != nil { return nil, err } tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return nil, err } return tlMgr.DescribeTaskList(request.DescRequest.GetIncludeTaskListStatus()), nil } func (e *matchingEngineImpl) ListTaskListPartitions( hCtx *handlerContext, request *types.MatchingListTaskListPartitionsRequest, ) (*types.ListTaskListPartitionsResponse, error) { activityTaskListInfo, err := e.listTaskListPartitions(request, persistence.TaskListTypeActivity) if err != nil { return nil, err } decisionTaskListInfo, err := e.listTaskListPartitions(request, persistence.TaskListTypeDecision) if err != nil { return nil, err } resp := &types.ListTaskListPartitionsResponse{ ActivityTaskListPartitions: activityTaskListInfo, DecisionTaskListPartitions: decisionTaskListInfo, } return resp, nil } func (e *matchingEngineImpl) listTaskListPartitions( request *types.MatchingListTaskListPartitionsRequest, taskListType int, ) ([]*types.TaskListPartitionMetadata, error) { partitions, err := e.getAllPartitions( request, taskListType, ) if err != nil { return nil, err } var partitionHostInfo []*types.TaskListPartitionMetadata for _, partition := range partitions { host, _ := e.getHostInfo(partition) partitionHostInfo = append(partitionHostInfo, &types.TaskListPartitionMetadata{ Key: partition, OwnerHostName: host, }) } return partitionHostInfo, nil } func (e *matchingEngineImpl) GetTaskListsByDomain( hCtx *handlerContext, request *types.GetTaskListsByDomainRequest, ) (*types.GetTaskListsByDomainResponse, error) { domainID, err := e.domainCache.GetDomainID(request.GetDomain()) if err != nil { return nil, err } e.taskListsLock.RLock() defer e.taskListsLock.RUnlock() return e.getTaskListByDomainLocked(domainID), nil } func (e *matchingEngineImpl) getHostInfo(partitionKey string) (string, error) { host, err := e.membershipResolver.Lookup(service.Matching, partitionKey) if err != nil { return "", err } return host.GetAddress(), nil } func (e *matchingEngineImpl) getAllPartitions( request *types.MatchingListTaskListPartitionsRequest, taskListType int, ) ([]string, error) { var partitionKeys []string domainID, err := e.domainCache.GetDomainID(request.GetDomain()) if err != nil { return partitionKeys, err } taskList := request.GetTaskList() taskListID, err := newTaskListID(domainID, taskList.GetName(), taskListType) if err != nil { return partitionKeys, err } rootPartition := taskListID.GetRoot() partitionKeys = append(partitionKeys, rootPartition) nWritePartitions := e.config.NumTasklistWritePartitions n := nWritePartitions(request.GetDomain(), rootPartition, taskListType) if n <= 0 { return partitionKeys, nil } for i := 1; i < n; i++ { partitionKeys = append(partitionKeys, fmt.Sprintf("%v%v/%v", common.ReservedTaskListPrefix, rootPartition, i)) } return partitionKeys, nil } // Loads a task from persistence and wraps it in a task context func (e *matchingEngineImpl) getTask(ctx context.Context, taskList *taskListID, maxDispatchPerSecond *float64, taskListKind *types.TaskListKind) (*InternalTask, error) { tlMgr, err := e.getTaskListManager(taskList, taskListKind) if err != nil { return nil, fmt.Errorf("couldn't load tasklist namanger: %w", err) } return tlMgr.GetTask(ctx, maxDispatchPerSecond) } func (e *matchingEngineImpl) unloadTaskList(tlMgr taskListManager) { id := tlMgr.TaskListID() e.taskListsLock.Lock() currentTlMgr, ok := e.taskLists[*id] if !ok || tlMgr != currentTlMgr { e.taskListsLock.Unlock() return } delete(e.taskLists, *id) e.taskListsLock.Unlock() tlMgr.Stop() } // Populate the decision task response based on context and scheduled/started events. func (e *matchingEngineImpl) createPollForDecisionTaskResponse( task *InternalTask, historyResponse *types.RecordDecisionTaskStartedResponse, scope metrics.Scope, ) *types.MatchingPollForDecisionTaskResponse { var token []byte if task.isQuery() { // for a query task queryRequest := task.query.request taskToken := &common.QueryTaskToken{ DomainID: queryRequest.DomainUUID, TaskList: queryRequest.TaskList.Name, TaskID: task.query.taskID, } token, _ = e.tokenSerializer.SerializeQueryTaskToken(taskToken) } else { taskToken := &common.TaskToken{ DomainID: task.event.DomainID, WorkflowID: task.event.WorkflowID, RunID: task.event.RunID, ScheduleID: historyResponse.GetScheduledEventID(), ScheduleAttempt: historyResponse.GetAttempt(), } token, _ = e.tokenSerializer.Serialize(taskToken) if task.responseC == nil { scope.RecordTimer(metrics.AsyncMatchLatencyPerTaskList, time.Since(task.event.CreatedTime)) } } response := common.CreateMatchingPollForDecisionTaskResponse(historyResponse, task.workflowExecution(), token) if task.query != nil { response.Query = task.query.request.QueryRequest.Query } response.BacklogCountHint = task.backlogCountHint return response } // Populate the activity task response based on context and scheduled/started events. func (e *matchingEngineImpl) createPollForActivityTaskResponse( task *InternalTask, historyResponse *types.RecordActivityTaskStartedResponse, scope metrics.Scope, ) *types.PollForActivityTaskResponse { scheduledEvent := historyResponse.ScheduledEvent if scheduledEvent.ActivityTaskScheduledEventAttributes == nil { panic("GetActivityTaskScheduledEventAttributes is not set") } attributes := scheduledEvent.ActivityTaskScheduledEventAttributes if attributes.ActivityID == "" { panic("ActivityTaskScheduledEventAttributes.ActivityID is not set") } if task.responseC == nil { scope.RecordTimer(metrics.AsyncMatchLatencyPerTaskList, time.Since(task.event.CreatedTime)) } response := &types.PollForActivityTaskResponse{} response.ActivityID = attributes.ActivityID response.ActivityType = attributes.ActivityType response.Header = attributes.Header response.Input = attributes.Input response.WorkflowExecution = task.workflowExecution() response.ScheduledTimestampOfThisAttempt = historyResponse.ScheduledTimestampOfThisAttempt response.ScheduledTimestamp = scheduledEvent.Timestamp response.ScheduleToCloseTimeoutSeconds = attributes.ScheduleToCloseTimeoutSeconds response.StartedTimestamp = historyResponse.StartedTimestamp response.StartToCloseTimeoutSeconds = attributes.StartToCloseTimeoutSeconds response.HeartbeatTimeoutSeconds = attributes.HeartbeatTimeoutSeconds token := &common.TaskToken{ DomainID: task.event.DomainID, WorkflowID: task.event.WorkflowID, WorkflowType: historyResponse.WorkflowType.GetName(), RunID: task.event.RunID, ScheduleID: task.event.ScheduleID, ScheduleAttempt: historyResponse.GetAttempt(), ActivityID: attributes.GetActivityID(), ActivityType: attributes.GetActivityType().GetName(), } response.TaskToken, _ = e.tokenSerializer.Serialize(token) response.Attempt = int32(token.ScheduleAttempt) response.HeartbeatDetails = historyResponse.HeartbeatDetails response.WorkflowType = historyResponse.WorkflowType response.WorkflowDomain = historyResponse.WorkflowDomain return response } func (e *matchingEngineImpl) recordDecisionTaskStarted( ctx context.Context, pollReq *types.PollForDecisionTaskRequest, task *InternalTask, ) (*types.RecordDecisionTaskStartedResponse, error) { request := &types.RecordDecisionTaskStartedRequest{ DomainUUID: task.event.DomainID, WorkflowExecution: task.workflowExecution(), ScheduleID: task.event.ScheduleID, TaskID: task.event.TaskID, RequestID: uuid.New(), PollRequest: pollReq, } var resp *types.RecordDecisionTaskStartedResponse op := func() error { var err error resp, err = e.historyService.RecordDecisionTaskStarted(ctx, request) return err } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(historyServiceOperationRetryPolicy), backoff.WithRetryableError(isMatchingRetryableError), ) err := throttleRetry.Do(ctx, op) return resp, err } func (e *matchingEngineImpl) recordActivityTaskStarted( ctx context.Context, pollReq *types.PollForActivityTaskRequest, task *InternalTask, ) (*types.RecordActivityTaskStartedResponse, error) { request := &types.RecordActivityTaskStartedRequest{ DomainUUID: task.event.DomainID, WorkflowExecution: task.workflowExecution(), ScheduleID: task.event.ScheduleID, TaskID: task.event.TaskID, RequestID: uuid.New(), PollRequest: pollReq, } var resp *types.RecordActivityTaskStartedResponse op := func() error { var err error resp, err = e.historyService.RecordActivityTaskStarted(ctx, request) return err } throttleRetry := backoff.NewThrottleRetry( backoff.WithRetryPolicy(historyServiceOperationRetryPolicy), backoff.WithRetryableError(isMatchingRetryableError), ) err := throttleRetry.Do(ctx, op) return resp, err } func (e *matchingEngineImpl) emitForwardedFromStats( scope metrics.Scope, isTaskForwarded bool, pollForwardedFrom string, ) { isPollForwarded := len(pollForwardedFrom) > 0 switch { case isTaskForwarded && isPollForwarded: scope.IncCounter(metrics.RemoteToRemoteMatchPerTaskListCounter) case isTaskForwarded: scope.IncCounter(metrics.RemoteToLocalMatchPerTaskListCounter) case isPollForwarded: scope.IncCounter(metrics.LocalToRemoteMatchPerTaskListCounter) default: scope.IncCounter(metrics.LocalToLocalMatchPerTaskListCounter) } } func (e *matchingEngineImpl) emitTaskIsolationMetrics( scope metrics.Scope, partitionConfig map[string]string, pollerIsolationGroup string, ) { if len(partitionConfig) > 0 { scope.Tagged(metrics.PartitionConfigTags(partitionConfig)...).Tagged(metrics.PollerIsolationGroupTag(pollerIsolationGroup)).IncCounter(metrics.IsolationTaskMatchPerTaskListCounter) } } func (e *matchingEngineImpl) emitInfoOrDebugLog( domainID string, msg string, tags ...tag.Tag, ) { if e.config.EnableDebugMode && e.config.EnableTaskInfoLogByDomainID(domainID) { e.logger.Info(msg, tags...) } else { e.logger.Debug(msg, tags...) } } func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) { m.Lock() defer m.Unlock() m.queryTaskMap[key] = value } func (m *lockableQueryTaskMap) get(key string) (chan *queryResult, bool) { m.RLock() defer m.RUnlock() result, ok := m.queryTaskMap[key] return result, ok } func (m *lockableQueryTaskMap) delete(key string) { m.Lock() defer m.Unlock() delete(m.queryTaskMap, key) } func isMatchingRetryableError(err error) bool { switch err.(type) { case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError, *types.EventAlreadyStartedError: return false } return true }