func()

in service/history/decision/handler.go [702:838]


func (handler *handlerImpl) handleBufferedQueries(
	msBuilder execution.MutableState,
	clientImpl string,
	clientFeatureVersion string,
	queryResults map[string]*types.WorkflowQueryResult,
	createNewDecisionTask bool,
	domainEntry *cache.DomainCacheEntry,
	decisionHeartbeating bool,
) {
	queryRegistry := msBuilder.GetQueryRegistry()
	if !queryRegistry.HasBufferedQuery() {
		return
	}

	domainID := domainEntry.GetInfo().ID
	domain := domainEntry.GetInfo().Name
	workflowID := msBuilder.GetExecutionInfo().WorkflowID
	runID := msBuilder.GetExecutionInfo().RunID

	scope := handler.metricsClient.Scope(
		metrics.HistoryRespondDecisionTaskCompletedScope,
		metrics.DomainTag(domainEntry.GetInfo().Name),
		metrics.DecisionTypeTag("ConsistentQuery"))

	// Consistent query requires both server and client worker support. If a consistent query was requested (meaning there are
	// buffered queries) but worker does not support consistent query then all buffered queries should be failed.
	if versionErr := handler.versionChecker.SupportsConsistentQuery(clientImpl, clientFeatureVersion); versionErr != nil {
		scope.IncCounter(metrics.WorkerNotSupportsConsistentQueryCount)
		failedTerminationState := &query.TerminationState{
			TerminationType: query.TerminationTypeFailed,
			Failure:         &types.BadRequestError{Message: versionErr.Error()},
		}
		buffered := queryRegistry.GetBufferedIDs()
		handler.logger.Info(
			"failing query because worker does not support consistent query",
			tag.ClientImpl(clientImpl),
			tag.ClientFeatureVersion(clientFeatureVersion),
			tag.WorkflowDomainName(domain),
			tag.WorkflowID(workflowID),
			tag.WorkflowRunID(runID),
			tag.Error(versionErr))
		for _, id := range buffered {
			if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
				handler.logger.Error(
					"failed to set query termination state to failed",
					tag.WorkflowDomainName(domain),
					tag.WorkflowID(workflowID),
					tag.WorkflowRunID(runID),
					tag.QueryID(id),
					tag.Error(err))
				scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			}
		}
		return
	}

	// if its a heartbeat decision it means local activities may still be running on the worker
	// which were started by an external event which happened before the query
	if decisionHeartbeating {
		return
	}

	sizeLimitError := handler.config.BlobSizeLimitError(domain)
	sizeLimitWarn := handler.config.BlobSizeLimitWarn(domain)

	// Complete or fail all queries we have results for
	for id, result := range queryResults {
		if err := common.CheckEventBlobSizeLimit(
			len(result.GetAnswer()),
			sizeLimitWarn,
			sizeLimitError,
			domainID,
			workflowID,
			runID,
			scope,
			handler.throttledLogger,
			tag.BlobSizeViolationOperation("ConsistentQuery"),
		); err != nil {
			handler.logger.Info("failing query because query result size is too large",
				tag.WorkflowDomainName(domain),
				tag.WorkflowID(workflowID),
				tag.WorkflowRunID(runID),
				tag.QueryID(id),
				tag.Error(err))
			failedTerminationState := &query.TerminationState{
				TerminationType: query.TerminationTypeFailed,
				Failure:         err,
			}
			if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
				handler.logger.Error(
					"failed to set query termination state to failed",
					tag.WorkflowDomainName(domain),
					tag.WorkflowID(workflowID),
					tag.WorkflowRunID(runID),
					tag.QueryID(id),
					tag.Error(err))
				scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			}
		} else {
			completedTerminationState := &query.TerminationState{
				TerminationType: query.TerminationTypeCompleted,
				QueryResult:     result,
			}
			if err := queryRegistry.SetTerminationState(id, completedTerminationState); err != nil {
				handler.logger.Error(
					"failed to set query termination state to completed",
					tag.WorkflowDomainName(domain),
					tag.WorkflowID(workflowID),
					tag.WorkflowRunID(runID),
					tag.QueryID(id),
					tag.Error(err))
				scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			}
		}
	}

	// If no decision task was created then it means no buffered events came in during this decision task's handling.
	// This means all unanswered buffered queries can be dispatched directly through matching at this point.
	if !createNewDecisionTask {
		buffered := queryRegistry.GetBufferedIDs()
		for _, id := range buffered {
			unblockTerminationState := &query.TerminationState{
				TerminationType: query.TerminationTypeUnblocked,
			}
			if err := queryRegistry.SetTerminationState(id, unblockTerminationState); err != nil {
				handler.logger.Error(
					"failed to set query termination state to unblocked",
					tag.WorkflowDomainName(domain),
					tag.WorkflowID(workflowID),
					tag.WorkflowRunID(runID),
					tag.QueryID(id),
					tag.Error(err))
				scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			}
		}
	}
}