func()

in service/history/engine/engineimpl/historyEngine.go [1181:1343]


func (e *historyEngineImpl) QueryWorkflow(
	ctx context.Context,
	request *types.HistoryQueryWorkflowRequest,
) (retResp *types.HistoryQueryWorkflowResponse, retErr error) {

	scope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope).Tagged(metrics.DomainTag(request.GetRequest().GetDomain()))
	shardMetricScope := e.metricsClient.Scope(metrics.HistoryQueryWorkflowScope, metrics.ShardIDTag(e.shard.GetShardID()))

	consistentQueryEnabled := e.config.EnableConsistentQuery() && e.config.EnableConsistentQueryByDomain(request.GetRequest().GetDomain())
	if request.GetRequest().GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
		if !consistentQueryEnabled {
			return nil, workflow.ErrConsistentQueryNotEnabled
		}
		shardMetricScope.IncCounter(metrics.ConsistentQueryPerShard)
		e.logger.SampleInfo("History QueryWorkflow called with QueryConsistencyLevelStrong", e.config.SampleLoggingRate(), tag.ShardID(e.shard.GetShardID()), tag.WorkflowID(request.GetRequest().Execution.WorkflowID), tag.WorkflowDomainName(request.GetRequest().Domain))
	}

	execution := *request.GetRequest().GetExecution()

	mutableStateResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
	if err != nil {
		return nil, err
	}
	req := request.GetRequest()
	if !mutableStateResp.GetIsWorkflowRunning() && req.QueryRejectCondition != nil {
		notOpenReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotOpen
		closeStatus := mutableStateResp.GetWorkflowCloseState()
		notCompletedCleanlyReject := req.GetQueryRejectCondition() == types.QueryRejectConditionNotCompletedCleanly && closeStatus != persistence.WorkflowCloseStatusCompleted
		if notOpenReject || notCompletedCleanlyReject {
			return &types.HistoryQueryWorkflowResponse{
				Response: &types.QueryWorkflowResponse{
					QueryRejected: &types.QueryRejected{
						CloseStatus: persistence.ToInternalWorkflowExecutionCloseStatus(int(closeStatus)),
					},
				},
			}, nil
		}
	}

	// query cannot be processed unless at least one decision task has finished
	// if first decision task has not finished wait for up to a second for it to complete
	queryFirstDecisionTaskWaitTime := defaultQueryFirstDecisionTaskWaitTime
	ctxDeadline, ok := ctx.Deadline()
	if ok {
		ctxWaitTime := time.Until(ctxDeadline) - time.Second
		if ctxWaitTime > queryFirstDecisionTaskWaitTime {
			queryFirstDecisionTaskWaitTime = ctxWaitTime
		}
	}
	deadline := time.Now().Add(queryFirstDecisionTaskWaitTime)
	for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) {
		<-time.After(queryFirstDecisionTaskCheckInterval)
		mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution)
		if err != nil {
			return nil, err
		}
	}

	if mutableStateResp.GetPreviousStartedEventID() <= 0 {
		scope.IncCounter(metrics.QueryBeforeFirstDecisionCount)
		return nil, workflow.ErrQueryWorkflowBeforeFirstDecision
	}

	de, err := e.shard.GetDomainCache().GetDomainByID(request.GetDomainUUID())
	if err != nil {
		return nil, err
	}

	wfContext, release, err := e.executionCache.GetOrCreateWorkflowExecution(ctx, request.GetDomainUUID(), execution)
	if err != nil {
		return nil, err
	}
	defer func() { release(retErr) }()
	mutableState, err := wfContext.LoadWorkflowExecution(ctx)
	if err != nil {
		return nil, err
	}
	// If history is corrupted, query will be rejected
	if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
		return nil, err
	} else if corrupted {
		return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."}
	}

	// There are two ways in which queries get dispatched to decider. First, queries can be dispatched on decision tasks.
	// These decision tasks potentially contain new events and queries. The events are treated as coming before the query in time.
	// The second way in which queries are dispatched to decider is directly through matching; in this approach queries can be
	// dispatched to decider immediately even if there are outstanding events that came before the query. The following logic
	// is used to determine if a query can be safely dispatched directly through matching or if given the desired consistency
	// level must be dispatched on a decision task. There are four cases in which a query can be dispatched directly through
	// matching safely, without violating the desired consistency level:
	// 1. the domain is not active, in this case history is immutable so a query dispatched at any time is consistent
	// 2. the workflow is not running, whenever a workflow is not running dispatching query directly is consistent
	// 3. the client requested eventual consistency, in this case there are no consistency requirements so dispatching directly through matching is safe
	// 4. if there is no pending or started decision it means no events came before query arrived, so its safe to dispatch directly
	isActive, _ := de.IsActiveIn(e.clusterMetadata.GetCurrentClusterName())
	safeToDispatchDirectly := !isActive ||
		!mutableState.IsWorkflowExecutionRunning() ||
		req.GetQueryConsistencyLevel() == types.QueryConsistencyLevelEventual ||
		(!mutableState.HasPendingDecision() && !mutableState.HasInFlightDecision())
	if safeToDispatchDirectly {
		release(nil)
		msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
		if err != nil {
			return nil, err
		}
		req.Execution.RunID = msResp.Execution.RunID
		return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope)
	}

	// If we get here it means query could not be dispatched through matching directly, so it must block
	// until either an result has been obtained on a decision task response or until it is safe to dispatch directly through matching.
	sw := scope.StartTimer(metrics.DecisionTaskQueryLatency)
	defer sw.Stop()
	queryReg := mutableState.GetQueryRegistry()
	if len(queryReg.GetBufferedIDs()) >= e.config.MaxBufferedQueryCount() {
		scope.IncCounter(metrics.QueryBufferExceededCount)
		return nil, workflow.ErrConsistentQueryBufferExceeded
	}
	queryID, termCh := queryReg.BufferQuery(req.GetQuery())
	defer queryReg.RemoveQuery(queryID)
	release(nil)
	select {
	case <-termCh:
		state, err := queryReg.GetTerminationState(queryID)
		if err != nil {
			scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			return nil, err
		}
		switch state.TerminationType {
		case query.TerminationTypeCompleted:
			result := state.QueryResult
			switch result.GetResultType() {
			case types.QueryResultTypeAnswered:
				return &types.HistoryQueryWorkflowResponse{
					Response: &types.QueryWorkflowResponse{
						QueryResult: result.GetAnswer(),
					},
				}, nil
			case types.QueryResultTypeFailed:
				return nil, &types.QueryFailedError{Message: result.GetErrorMessage()}
			default:
				scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
				return nil, workflow.ErrQueryEnteredInvalidState
			}
		case query.TerminationTypeUnblocked:
			msResp, err := e.getMutableState(ctx, request.GetDomainUUID(), execution)
			if err != nil {
				return nil, err
			}
			req.Execution.RunID = msResp.Execution.RunID
			return e.queryDirectlyThroughMatching(ctx, msResp, request.GetDomainUUID(), req, scope)
		case query.TerminationTypeFailed:
			return nil, state.Failure
		default:
			scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
			return nil, workflow.ErrQueryEnteredInvalidState
		}
	case <-ctx.Done():
		scope.IncCounter(metrics.ConsistentQueryTimeoutCount)
		return nil, ctx.Err()
	}
}