in service/history/decision/handler.go [702:838]
func (handler *handlerImpl) handleBufferedQueries(
msBuilder execution.MutableState,
clientImpl string,
clientFeatureVersion string,
queryResults map[string]*types.WorkflowQueryResult,
createNewDecisionTask bool,
domainEntry *cache.DomainCacheEntry,
decisionHeartbeating bool,
) {
queryRegistry := msBuilder.GetQueryRegistry()
if !queryRegistry.HasBufferedQuery() {
return
}
domainID := domainEntry.GetInfo().ID
domain := domainEntry.GetInfo().Name
workflowID := msBuilder.GetExecutionInfo().WorkflowID
runID := msBuilder.GetExecutionInfo().RunID
scope := handler.metricsClient.Scope(
metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DomainTag(domainEntry.GetInfo().Name),
metrics.DecisionTypeTag("ConsistentQuery"))
// Consistent query requires both server and client worker support. If a consistent query was requested (meaning there are
// buffered queries) but worker does not support consistent query then all buffered queries should be failed.
if versionErr := handler.versionChecker.SupportsConsistentQuery(clientImpl, clientFeatureVersion); versionErr != nil {
scope.IncCounter(metrics.WorkerNotSupportsConsistentQueryCount)
failedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeFailed,
Failure: &types.BadRequestError{Message: versionErr.Error()},
}
buffered := queryRegistry.GetBufferedIDs()
handler.logger.Info(
"failing query because worker does not support consistent query",
tag.ClientImpl(clientImpl),
tag.ClientFeatureVersion(clientFeatureVersion),
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.Error(versionErr))
for _, id := range buffered {
if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to failed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
return
}
// if its a heartbeat decision it means local activities may still be running on the worker
// which were started by an external event which happened before the query
if decisionHeartbeating {
return
}
sizeLimitError := handler.config.BlobSizeLimitError(domain)
sizeLimitWarn := handler.config.BlobSizeLimitWarn(domain)
// Complete or fail all queries we have results for
for id, result := range queryResults {
if err := common.CheckEventBlobSizeLimit(
len(result.GetAnswer()),
sizeLimitWarn,
sizeLimitError,
domainID,
workflowID,
runID,
scope,
handler.throttledLogger,
tag.BlobSizeViolationOperation("ConsistentQuery"),
); err != nil {
handler.logger.Info("failing query because query result size is too large",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
failedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeFailed,
Failure: err,
}
if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to failed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
} else {
completedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeCompleted,
QueryResult: result,
}
if err := queryRegistry.SetTerminationState(id, completedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to completed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
}
// If no decision task was created then it means no buffered events came in during this decision task's handling.
// This means all unanswered buffered queries can be dispatched directly through matching at this point.
if !createNewDecisionTask {
buffered := queryRegistry.GetBufferedIDs()
for _, id := range buffered {
unblockTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeUnblocked,
}
if err := queryRegistry.SetTerminationState(id, unblockTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to unblocked",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
}
}