public FlushResult pollWithWait()

in core/src/main/java/flex/messaging/client/FlexClient.java [779:898]


    public FlushResult pollWithWait(String endpointId, FlexSession session, PollWaitListener listener, long waitIntervalMillis) {
        EndpointQueue queue;
        synchronized (lock) {
            checkValid();

            queue = outboundQueues.get(endpointId);

            // If the queue exists and is not empty there's no reason to wait; flush immediately.
            if (queue != null) {
                FlushResult flushResult = internalPoll(queue);
                if (flushResult != null)
                    return flushResult;
            }
        }

        // The queue exists but it was empty; we can try to wait for messages.
        if (queue != null) {
            synchronized (session) {
                // Set up the waitMonitor on the session; this is a reference to the queue that the
                // current poll request targets and we use it as a wait/notify monitor.
                // This also lets us prevent busy polling cycles from a single client. If we already have a waited
                // poll request a subsequent poll request is treated as a no-op.
                if (session.waitMonitor != null) {
                    final EndpointQueue waitingQueue = session.waitMonitor.get(endpointId);
                    // If the poll is from the same client swf, and the same endpoint, treat it as a no-op poll.
                    if (waitingQueue != null && waitingQueue.flexClient.equals(this)) {
                        PollFlushResult result = new PollFlushResult();
                        result.setClientProcessingSuppressed(true);
                        return result;
                    }
                } else {
                    session.waitMonitor = new HashMap<String, EndpointQueue>();
                }

                // Set the waitMonitor for the session to the queue
                // for this poll request before releasing the lock.
                session.waitMonitor.put(endpointId, queue);
            }

            // Now that the session references the wait monitor this thread will use to wait we can enter
            // the wait state.
            // -1 wait-interval actually means wait until notified.
            waitIntervalMillis = (waitIntervalMillis == -1) ? 0 : waitIntervalMillis;
            String threadName = Thread.currentThread().getName();
            try {
                boolean didWait = false;
                boolean avoidBusyPolling = false;
                synchronized (queue) {
                    // If the message queue is still empty, wait for a message to be added before invoking flush.
                    if (queue.messages.isEmpty()) {
                        reportStatusIfDebug("waiting for new messages to arrive");

                        didWait = true;

                        // Tag thread name during the wait.
                        Thread currentThread = Thread.currentThread();
                        currentThread.setName(threadName + POLL_WAIT_THREAD_NAME_EXTENSION);

                        if (listener != null)
                            listener.waitStart(queue);

                        queue.waitPoll = true; // Mark the queue as waiting.

                        queue.wait(waitIntervalMillis);

                        queue.waitPoll = false; // Unmark the queue as waiting.

                        // Reset thread name now that the wait is over.
                        currentThread.setName(threadName);

                        if (listener != null)
                            listener.waitEnd(queue);

                        if (queue.avoidBusyPolling) {
                            avoidBusyPolling = true;
                            queue.avoidBusyPolling = false;
                        }
                    }
                }

                synchronized (session) {
                    if (session.waitMonitor != null) {
                        session.waitMonitor.remove(endpointId);
                    }
                }

                if (Log.isDebug()) {
                    if (didWait)
                        reportStatusIfDebug("done waiting for new messages to arrive and is flushing the outbound queue");
                    else
                        reportStatusIfDebug("didn't need to wait and is flushing the outbound queue");
                }

                // We need to hold the FlexClient lock to invoke flush.
                FlushResult result;
                synchronized (lock) {
                    result = internalFlush(queue);
                }
                if (avoidBusyPolling) {
                    PollFlushResult swappedPollResult = new PollFlushResult();
                    if (result != null) {
                        swappedPollResult.setMessages(result.getMessages());
                        swappedPollResult.setNextFlushWaitTimeMillis(result.getNextFlushWaitTimeMillis());
                    }
                    swappedPollResult.setAvoidBusyPolling(true);
                    result = swappedPollResult;
                }
                return result;
            } catch (InterruptedException e) {
                if (Log.isWarn())
                    Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).warn("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id +
                            "' could not finish waiting for new messages to arrive " +
                            "because it was interrupted: " + e.toString());
            }
        } else {
            // The queue was null; let the client know that there are no active subscriptions.
            throwNotSubscribedException(endpointId);
        }
        return null;
    }