in core/src/main/java/flex/messaging/client/FlexClient.java [779:898]
public FlushResult pollWithWait(String endpointId, FlexSession session, PollWaitListener listener, long waitIntervalMillis) {
EndpointQueue queue;
synchronized (lock) {
checkValid();
queue = outboundQueues.get(endpointId);
// If the queue exists and is not empty there's no reason to wait; flush immediately.
if (queue != null) {
FlushResult flushResult = internalPoll(queue);
if (flushResult != null)
return flushResult;
}
}
// The queue exists but it was empty; we can try to wait for messages.
if (queue != null) {
synchronized (session) {
// Set up the waitMonitor on the session; this is a reference to the queue that the
// current poll request targets and we use it as a wait/notify monitor.
// This also lets us prevent busy polling cycles from a single client. If we already have a waited
// poll request a subsequent poll request is treated as a no-op.
if (session.waitMonitor != null) {
final EndpointQueue waitingQueue = session.waitMonitor.get(endpointId);
// If the poll is from the same client swf, and the same endpoint, treat it as a no-op poll.
if (waitingQueue != null && waitingQueue.flexClient.equals(this)) {
PollFlushResult result = new PollFlushResult();
result.setClientProcessingSuppressed(true);
return result;
}
} else {
session.waitMonitor = new HashMap<String, EndpointQueue>();
}
// Set the waitMonitor for the session to the queue
// for this poll request before releasing the lock.
session.waitMonitor.put(endpointId, queue);
}
// Now that the session references the wait monitor this thread will use to wait we can enter
// the wait state.
// -1 wait-interval actually means wait until notified.
waitIntervalMillis = (waitIntervalMillis == -1) ? 0 : waitIntervalMillis;
String threadName = Thread.currentThread().getName();
try {
boolean didWait = false;
boolean avoidBusyPolling = false;
synchronized (queue) {
// If the message queue is still empty, wait for a message to be added before invoking flush.
if (queue.messages.isEmpty()) {
reportStatusIfDebug("waiting for new messages to arrive");
didWait = true;
// Tag thread name during the wait.
Thread currentThread = Thread.currentThread();
currentThread.setName(threadName + POLL_WAIT_THREAD_NAME_EXTENSION);
if (listener != null)
listener.waitStart(queue);
queue.waitPoll = true; // Mark the queue as waiting.
queue.wait(waitIntervalMillis);
queue.waitPoll = false; // Unmark the queue as waiting.
// Reset thread name now that the wait is over.
currentThread.setName(threadName);
if (listener != null)
listener.waitEnd(queue);
if (queue.avoidBusyPolling) {
avoidBusyPolling = true;
queue.avoidBusyPolling = false;
}
}
}
synchronized (session) {
if (session.waitMonitor != null) {
session.waitMonitor.remove(endpointId);
}
}
if (Log.isDebug()) {
if (didWait)
reportStatusIfDebug("done waiting for new messages to arrive and is flushing the outbound queue");
else
reportStatusIfDebug("didn't need to wait and is flushing the outbound queue");
}
// We need to hold the FlexClient lock to invoke flush.
FlushResult result;
synchronized (lock) {
result = internalFlush(queue);
}
if (avoidBusyPolling) {
PollFlushResult swappedPollResult = new PollFlushResult();
if (result != null) {
swappedPollResult.setMessages(result.getMessages());
swappedPollResult.setNextFlushWaitTimeMillis(result.getNextFlushWaitTimeMillis());
}
swappedPollResult.setAvoidBusyPolling(true);
result = swappedPollResult;
}
return result;
} catch (InterruptedException e) {
if (Log.isWarn())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).warn("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id +
"' could not finish waiting for new messages to arrive " +
"because it was interrupted: " + e.toString());
}
} else {
// The queue was null; let the client know that there are no active subscriptions.
throwNotSubscribedException(endpointId);
}
return null;
}