protected void doMessages()

in activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java [263:407]


    protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {

        int messages = 0;
        // This is a poll for any messages

        long timeout = getReadTimeout(request);
        if (LOG.isDebugEnabled()) {
            LOG.debug("doMessage timeout=" + timeout);
        }

        // this is non-null if we're resuming the asyncRequest.
        // attributes set in AjaxListener
        UndeliveredAjaxMessage undelivered_message = null;
        Message message = null;
        undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
        if( undelivered_message != null ) {
            message = undelivered_message.getMessage();
        }

        synchronized (client) {

            List<MessageConsumer> consumers = client.getConsumers();
            MessageAvailableConsumer consumer = null;
            if( undelivered_message != null ) {
                consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
            }

            if (message == null) {
                // Look for a message that is ready to go
                for (int i = 0; message == null && i < consumers.size(); i++) {
                    consumer = (MessageAvailableConsumer)consumers.get(i);
                    if (consumer.getAvailableListener() == null) {
                        continue;
                    }

                    // Look for any available messages
                    message = consumer.receive(10);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("received " + message + " from " + consumer);
                    }
                }
            }

            // prepare the response
            response.setContentType("text/xml");
            response.setHeader("Cache-Control", "no-cache");

            if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
                final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request);

                if (asyncRequest.isExpired()) {
                    response.setStatus(HttpServletResponse.SC_OK);
                    StringWriter swriter = new StringWriter();
                    PrintWriter writer = new PrintWriter(swriter);
                    writer.println("<ajax-response>");
                    writer.print("</ajax-response>");

                    writer.flush();
                    String m = swriter.toString();
                    response.getWriter().println(m);

                    return;
                }

                asyncRequest.setTimeoutMs(timeout);
                asyncRequest.startAsync();
                LOG.debug("Suspending asyncRequest " + asyncRequest);

                // Fetch the listeners
                AjaxListener listener = client.getListener();
                listener.access();

                // register this asyncRequest with our listener.
                listener.setAsyncRequest(asyncRequest);

                return;
            }

            StringWriter swriter = new StringWriter();
            PrintWriter writer = new PrintWriter(swriter);

            Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
            Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
            response.setStatus(HttpServletResponse.SC_OK);
            writer.println("<ajax-response>");

            // Send any message we already have
            if (message != null) {
                String id = consumerIdMap.get(consumer);
                String destinationName = consumerDestinationNameMap.get(consumer);
                LOG.debug( "sending pre-existing message" );
                writeMessageResponse(writer, message, id, destinationName);

                messages++;
            }

            // send messages buffered while asyncRequest was unavailable.
            LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
            LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
            synchronized( undeliveredMessages ) {
                for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
                    messages++;
                    UndeliveredAjaxMessage undelivered = it.next();
                    Message msg = undelivered.getMessage();
                    consumer = (MessageAvailableConsumer)undelivered.getConsumer();
                    String id = consumerIdMap.get(consumer);
                    String destinationName = consumerDestinationNameMap.get(consumer);
                    LOG.debug( "sending undelivered/buffered messages" );
                    LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
                    writeMessageResponse(writer, msg, id, destinationName);
                    it.remove();
                    if (messages >= maximumMessages) {
                        break;
                    }
                }
            }

            // Send the rest of the messages
            for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
                consumer = (MessageAvailableConsumer)consumers.get(i);
                if (consumer.getAvailableListener() == null) {
                    continue;
                }

                // Look for any available messages
                while (messages < maximumMessages) {
                    message = consumer.receiveNoWait();
                    if (message == null) {
                        break;
                    }
                    messages++;
                    String id = consumerIdMap.get(consumer);
                    String destinationName = consumerDestinationNameMap.get(consumer);
                    LOG.debug( "sending final available messages" );
                    writeMessageResponse(writer, message, id, destinationName);
                }
            }

            writer.print("</ajax-response>");

            writer.flush();
            String m = swriter.toString();
            response.getWriter().println(m);
        }
    }