public void onMessage()

in src/main/java/org/apache/sling/jms/impl/JMSQueueManager.java [262:314]


        public void onMessage(Message message) {
            boolean committed = false;
            TextMessage textMessage = null;
            try {
                try {
                    LOGGER.info("Got from {} message {} ", name, message);
                    Destination destination = message.getJMSDestination();
                    if (destination instanceof Queue) {
                        Queue queue = (Queue) destination;
                        if ( JMSMessageTypes.JSON.equals(JMSMessageTypes.valueOf(message.getJMSType()))) {
                            textMessage = (TextMessage) message;
                            final Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
                            Types.QueueName queueName = Types.queueName(queue.getQueueName());
                            if (queueName.equals(name) && messageFilter.accept(queueName, mapMessage)) {
                                queueReader.onMessage(queueName, filter(mapMessage));
                                session.commit();
                                // all ok.
                                committed = true;
                                return;
                            }
                        }
                    }
                } catch (RequeueMessageException e) {
                    LOGGER.info("QueueReader requested requeue of message ", e);
                    if (retryByRequeue && textMessage != null) {
                        Map<String, Object> mapMessage = Json.toMap(textMessage.getText());
                        if ((long)mapMessage.get(NRETRIES) < maxRetries) {
                            mapMessage.put(NRETRIES, ((long) mapMessage.get(NRETRIES)) + 1);
                            TextMessage retryMessage = session.createTextMessage(Json.toJson(mapMessage));
                            retryMessage.setJMSType(JMSMessageTypes.JSON.toString());
                            LOGGER.info("Retrying message Sending to {} message {} ", name, textMessage);
                            queueProducer.send(retryMessage);
                            session.commit();
                            committed = true;
                            return;
                        }
                    }
                }
            } catch (JMSException e) {
                LOGGER.info("Receive failed leaving to provider to require if required. ", e);
            } finally {
                try {
                    if (!committed) {
                        session.rollback();
                    }
                } catch (JMSException e) {
                    LOGGER.info("QueueReader rollback failed. ",e);
                }
            }
            // If onMessage throws an exception JMS will put the message back onto the queue.
            // the delay before it gets retried is a JMS server configuration.
            throw new IllegalArgumentException("Unable to process message, requeue");
        }