protected FlushResult handleFlexClientPoll()

in core/src/main/java/flex/messaging/endpoints/BasePollingHTTPEndpoint.java [440:550]


    protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) {
        FlushResult flushResult = null;
        if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER)) {
            FlexSession session = FlexContext.getFlexSession();
            // If canWait is true it means we currently have less than the max number of allowed waiting threads.

            // We need to protect writes/reads to the wait count with the endpoint's lock.
            // Also, we have to be careful to handle the case where two threads get to this point when only
            // one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll.
            boolean thisThreadCanWait;
            synchronized (lock) {
                ++waitingPollRequestsCount;
                if (waitingPollRequestsCount == maxWaitingPollRequests) {
                    thisThreadCanWait = true; // This thread got the last wait spot.
                    canWait = false;
                } else if (waitingPollRequestsCount > maxWaitingPollRequests) {
                    thisThreadCanWait = false; // This thread was beaten out for the last spot.
                    --waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait.
                    canWait = false; // All the wait spots are currently occupied so prevent further attempts for now.
                } else {
                    // We haven't hit the limit yet, allow this thread to wait.
                    thisThreadCanWait = true;
                }
            }

            // Check the max waiting connections per session count
            if (thisThreadCanWait) {
                String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
                UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
                synchronized (session) {
                    if (agentSettings != null)
                        session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();

                    ++session.streamingConnectionsCount;
                    if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
                            || session.streamingConnectionsCount <= session.maxConnectionsPerSession) {
                        thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait.
                    } else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
                    {
                        thisThreadCanWait = false; // no more from this client
                        --session.streamingConnectionsCount;
                    }
                }

                if (!thisThreadCanWait) {
                    // Decrement the waiting poll count, since this poll isn't going to wait.
                    synchronized (lock) {
                        --waitingPollRequestsCount;
                        if (waitingPollRequestsCount < maxWaitingPollRequests)
                            canWait = true;
                    }
                    if (Log.isDebug()) {
                        log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait.");
                    }
                }

            }

            if (thisThreadCanWait) {
                if (Log.isDebug())
                    log.debug("Number of waiting threads for endpoint with id '" + getId() + "' is " + waitingPollRequestsCount + ".");

                try {
                    flushResult = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval);
                    if (flushResult != null) {
                        // Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly.
                        if ((flushResult instanceof PollFlushResult) && ((PollFlushResult) flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) {
                            // Force the client polling interval to match the default defined in the client PollingChannel.
                            flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
                        } else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0)) {
                            // If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined.
                            flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
                        }
                    }
                } finally {
                    // We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary
                    synchronized (lock) {
                        --waitingPollRequestsCount;
                        if (waitingPollRequestsCount < maxWaitingPollRequests)
                            canWait = true;
                    }
                    synchronized (session) {
                        --session.streamingConnectionsCount;
                    }

                    if (Log.isDebug())
                        log.debug("Number of waiting threads for endpoint with id '" + getId() + "' is " + waitingPollRequestsCount + ".");
                }
            }
        } else if (Log.isDebug() && waitEnabled) {
            if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
                log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process.");
            else
                log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '" + flexClient.getId() + "' will poll with no wait.");
        }

        // If we weren't able to do a poll with wait above for any reason just run the base poll handling logic.
        if (flushResult == null) {
            flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
            // If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively.
            // In this case, force a client wait to match the default polling interval defined in the client PollingChannel.
            if (waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) {
                if (flushResult == null) {
                    flushResult = new FlushResult();
                }
                flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
            }
        }

        return flushResult;
    }