in core/src/main/java/flex/messaging/endpoints/BasePollingHTTPEndpoint.java [440:550]
protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) {
FlushResult flushResult = null;
if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER)) {
FlexSession session = FlexContext.getFlexSession();
// If canWait is true it means we currently have less than the max number of allowed waiting threads.
// We need to protect writes/reads to the wait count with the endpoint's lock.
// Also, we have to be careful to handle the case where two threads get to this point when only
// one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll.
boolean thisThreadCanWait;
synchronized (lock) {
++waitingPollRequestsCount;
if (waitingPollRequestsCount == maxWaitingPollRequests) {
thisThreadCanWait = true; // This thread got the last wait spot.
canWait = false;
} else if (waitingPollRequestsCount > maxWaitingPollRequests) {
thisThreadCanWait = false; // This thread was beaten out for the last spot.
--waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait.
canWait = false; // All the wait spots are currently occupied so prevent further attempts for now.
} else {
// We haven't hit the limit yet, allow this thread to wait.
thisThreadCanWait = true;
}
}
// Check the max waiting connections per session count
if (thisThreadCanWait) {
String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
synchronized (session) {
if (agentSettings != null)
session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
++session.streamingConnectionsCount;
if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
|| session.streamingConnectionsCount <= session.maxConnectionsPerSession) {
thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait.
} else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
{
thisThreadCanWait = false; // no more from this client
--session.streamingConnectionsCount;
}
}
if (!thisThreadCanWait) {
// Decrement the waiting poll count, since this poll isn't going to wait.
synchronized (lock) {
--waitingPollRequestsCount;
if (waitingPollRequestsCount < maxWaitingPollRequests)
canWait = true;
}
if (Log.isDebug()) {
log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait.");
}
}
}
if (thisThreadCanWait) {
if (Log.isDebug())
log.debug("Number of waiting threads for endpoint with id '" + getId() + "' is " + waitingPollRequestsCount + ".");
try {
flushResult = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval);
if (flushResult != null) {
// Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly.
if ((flushResult instanceof PollFlushResult) && ((PollFlushResult) flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) {
// Force the client polling interval to match the default defined in the client PollingChannel.
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
} else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0)) {
// If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined.
flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
}
}
} finally {
// We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary
synchronized (lock) {
--waitingPollRequestsCount;
if (waitingPollRequestsCount < maxWaitingPollRequests)
canWait = true;
}
synchronized (session) {
--session.streamingConnectionsCount;
}
if (Log.isDebug())
log.debug("Number of waiting threads for endpoint with id '" + getId() + "' is " + waitingPollRequestsCount + ".");
}
}
} else if (Log.isDebug() && waitEnabled) {
if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process.");
else
log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '" + flexClient.getId() + "' will poll with no wait.");
}
// If we weren't able to do a poll with wait above for any reason just run the base poll handling logic.
if (flushResult == null) {
flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
// If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively.
// In this case, force a client wait to match the default polling interval defined in the client PollingChannel.
if (waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS)) {
if (flushResult == null) {
flushResult = new FlushResult();
}
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
}
}
return flushResult;
}