in src/main/java/org/apache/sling/jms/impl/JMSQueueManager.java [262:314]
public void onMessage(Message message) {
boolean committed = false;
TextMessage textMessage = null;
try {
try {
LOGGER.info("Got from {} message {} ", name, message);
Destination destination = message.getJMSDestination();
if (destination instanceof Queue) {
Queue queue = (Queue) destination;
if ( JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(message.getJMSType()))) {
textMessage = (TextMessage) message;
final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
Types.QueueName queueName = Types.queueName(queue.getQueueName());
if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
queueReader.onMessage(queueName, filter(mapMessage));
session.commit();
// all ok.
committed = true;
return;
}
}
}
} catch (RequeueMessageException e) {
LOGGER.info("QueueReader requested requeue of message ", e);
if (retryByRequeue && textMessage != null) {
Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
if ((long)mapMessage.get(NRETRIES) < maxRetries) {
mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
LOGGER.info("Retrying message Sending to {} message {} ", name, textMessage);
queueProducer.send(retryMessage);
session.commit();
committed = true;
return;
}
}
}
} catch (JMSException e) {
LOGGER.info("Receive failed leaving to provider to require if required. ", e);
} finally {
try {
if (!committed) {
session.rollback();
}
} catch (JMSException e) {
LOGGER.info("QueueReader rollback failed. ",e);
}
}
// If onMessage throws an exception JMS will put the message back onto the queue.
// the delay before it gets retried is a JMS server configuration.
throw new IllegalArgumentException("Unable to process message, requeue");
}