in service/history/engine/engineimpl/historyEngine.go [1181:1343]
func (e *historyEngineImpl) QueryWorkflow(
ctx context.Context,
request *types.HistoryQueryWorkflowRequest,
) (retResp *types.HistoryQueryWorkflowResponse, retErr error) {
scope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope).Tagged(metrics.DomainTag(request.GetRequest().GetDomain()))
shardMetricScope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope, metrics.ShardIDTag(e.shard.GetShardID()))
consistentQueryEnabled := e.config.EnableConsistentQuery() && e.config.EnableConsistentQueryByDomain(request.GetRequest().GetDomain())
if request.GetRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
if !consistentQueryEnabled {
return nil, workflow.ErrConsistentQueryNotEnabled
}
shardMetricScope.IncCounter(metrics.ConsistentQueryPerShard)
e.logger.SampleInfo("History QueryWorkflow called with QueryConsistencyLevelStrong", e.config.SampleLoggingRate(), tag.ShardID(e.shard.GetShardID()), tag.WorkflowID(request.GetRequest().Execution.WorkflowID), tag.WorkflowDomainName(request.GetRequest().Domain))
}
execution := *request.GetRequest().GetExecution()
mutableStateResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
}
req := request.GetRequest()
if !mutableStateResp.GetIsWorkflowRunning() && req.QueryRejectCondition != nil {
notOpenReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotOpen
closeStatus := mutableStateResp.GetWorkflowCloseState()
notCompletedCleanlyReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotCompletedCleanly && closeStatus != persistence.WorkflowCloseStatusCompleted
if notOpenReject || notCompletedCleanlyReject {
return &types.HistoryQueryWorkflowResponse{
Response: &types.QueryWorkflowResponse{
QueryRejected: &types.QueryRejected{
CloseStatus: persistence.ToInternalWorkflowExecutionCloseStatus(int(closeStatus)),
},
},
}, nil
}
}
// query cannot be processed unless at least one decision task has finished
// if first decision task has not finished wait for up to a second for it to complete
queryFirstDecisionTaskWaitTime := defaultQueryFirstDecisionTaskWaitTime
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxWaitTime := time.Until(ctxDeadline) - time.Second
if ctxWaitTime > queryFirstDecisionTaskWaitTime {
queryFirstDecisionTaskWaitTime = ctxWaitTime
}
}
deadline := time.Now().Add(queryFirstDecisionTaskWaitTime)
for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) {
<-time.After(queryFirstDecisionTaskCheckInterval)
mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
}
}
if mutableStateResp.GetPreviousStartedEventID() <= 0 {
scope.IncCounter(metrics.QueryBeforeFirstDecisionCount)
return nil, workflow.ErrQueryWorkflowBeforeFirstDecision
}
de, err := e.shard.GetDomainCache().GetDomainByID(request.GetDomainUUID())
if err != nil {
return nil, err
}
wfContext, release, err := e.executionCache.GetOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
}
defer func() { release(retErr) }()
mutableState, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
// If history is corrupted, query will be rejected
if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
return nil, err
} else if corrupted {
return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."}
}
// There are two ways in which queries get dispatched to decider. First, queries can be dispatched on decision tasks.
// These decision tasks potentially contain new events and queries. The events are treated as coming before the query in time.
// The second way in which queries are dispatched to decider is directly through matching; in this approach queries can be
// dispatched to decider immediately even if there are outstanding events that came before the query. The following logic
// is used to determine if a query can be safely dispatched directly through matching or if given the desired consistency
// level must be dispatched on a decision task. There are four cases in which a query can be dispatched directly through
// matching safely, without violating the desired consistency level:
// 1. the domain is not active, in this case history is immutable so a query dispatched at any time is consistent
// 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent
// 3. the client requested eventual consistency, in this case there are no consistency requirements so dispatching directly through matching is safe
// 4. if there is no pending or started decision it means no events came before query arrived, so its safe to dispatch directly
isActive, _ := de.IsActiveIn(e.clusterMetadata.GetCurrentClusterName())
safeToDispatchDirectly := !isActive ||
!mutableState.IsWorkflowExecutionRunning() ||
req.GetQueryConsistencyLevel() == types.QueryConsistencyLevelEventual ||
(!mutableState.HasPendingDecision() && !mutableState.HasInFlightDecision())
if safeToDispatchDirectly {
release(nil)
msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
}
req.Execution.RunID = msResp.Execution.RunID
return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope)
}
// If we get here it means query could not be dispatched through matching directly, so it must block
// until either an result has been obtained on a decision task response or until it is safe to dispatch directly through matching.
sw := scope.StartTimer(metrics.DecisionTaskQueryLatency)
defer sw.Stop()
queryReg := mutableState.GetQueryRegistry()
if len(queryReg.GetBufferedIDs()) >= e.config.MaxBufferedQueryCount() {
scope.IncCounter(metrics.QueryBufferExceededCount)
return nil, workflow.ErrConsistentQueryBufferExceeded
}
queryID, termCh := queryReg.BufferQuery(req.GetQuery())
defer queryReg.RemoveQuery(queryID)
release(nil)
select {
case <-termCh:
state, err := queryReg.GetTerminationState(queryID)
if err != nil {
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, err
}
switch state.TerminationType {
case query.TerminationTypeCompleted:
result := state.QueryResult
switch result.GetResultType() {
case types.QueryResultTypeAnswered:
return &types.HistoryQueryWorkflowResponse{
Response: &types.QueryWorkflowResponse{
QueryResult: result.GetAnswer(),
},
}, nil
case types.QueryResultTypeFailed:
return nil, &types.QueryFailedError{Message: result.GetErrorMessage()}
default:
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, workflow.ErrQueryEnteredInvalidState
}
case query.TerminationTypeUnblocked:
msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
}
req.Execution.RunID = msResp.Execution.RunID
return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope)
case query.TerminationTypeFailed:
return nil, state.Failure
default:
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, workflow.ErrQueryEnteredInvalidState
}
case <-ctx.Done():
scope.IncCounter(metrics.ConsistentQueryTimeoutCount)
return nil, ctx.Err()
}
}