in activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java [263:407]
protected void doMessages(AjaxWebClient client, HttpServletRequest request, HttpServletResponse response) throws JMSException, IOException {
int messages = 0;
// This is a poll for any messages
long timeout = getReadTimeout(request);
if (LOG.isDebugEnabled()) {
LOG.debug("doMessage timeout=" + timeout);
}
// this is non-null if we're resuming the asyncRequest.
// attributes set in AjaxListener
UndeliveredAjaxMessage undelivered_message = null;
Message message = null;
undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
if( undelivered_message != null ) {
message = undelivered_message.getMessage();
}
synchronized (client) {
List<MessageConsumer> consumers = client.getConsumers();
MessageAvailableConsumer consumer = null;
if( undelivered_message != null ) {
consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
}
if (message == null) {
// Look for a message that is ready to go
for (int i = 0; message == null && i < consumers.size(); i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
if (consumer.getAvailableListener() == null) {
continue;
}
// Look for any available messages
message = consumer.receive(10);
if (LOG.isDebugEnabled()) {
LOG.debug("received " + message + " from " + consumer);
}
}
}
// prepare the response
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
final AsyncServletRequest asyncRequest = AsyncServletRequest.getAsyncRequest(request);
if (asyncRequest.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
writer.println("<ajax-response>");
writer.print("</ajax-response>");
writer.flush();
String m = swriter.toString();
response.getWriter().println(m);
return;
}
asyncRequest.setTimeoutMs(timeout);
asyncRequest.startAsync();
LOG.debug("Suspending asyncRequest " + asyncRequest);
// Fetch the listeners
AjaxListener listener = client.getListener();
listener.access();
// register this asyncRequest with our listener.
listener.setAsyncRequest(asyncRequest);
return;
}
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
// Send any message we already have
if (message != null) {
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending pre-existing message" );
writeMessageResponse(writer, message, id, destinationName);
messages++;
}
// send messages buffered while asyncRequest was unavailable.
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
synchronized( undeliveredMessages ) {
for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
messages++;
UndeliveredAjaxMessage undelivered = it.next();
Message msg = undelivered.getMessage();
consumer = (MessageAvailableConsumer)undelivered.getConsumer();
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending undelivered/buffered messages" );
LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
writeMessageResponse(writer, msg, id, destinationName);
it.remove();
if (messages >= maximumMessages) {
break;
}
}
}
// Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
if (consumer.getAvailableListener() == null) {
continue;
}
// Look for any available messages
while (messages < maximumMessages) {
message = consumer.receiveNoWait();
if (message == null) {
break;
}
messages++;
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending final available messages" );
writeMessageResponse(writer, message, id, destinationName);
}
}
writer.print("</ajax-response>");
writer.flush();
String m = swriter.toString();
response.getWriter().println(m);
}
}