in activemq-client/src/main/java/org/apache/activemq/util/ConsumerThread.java [50:111]
public void run() {
running = true;
MessageConsumer consumer = null;
String threadName = Thread.currentThread().getName();
LOG.info(threadName + " wait until " + messageCount + " messages are consumed");
try {
if (durable && destination instanceof Topic) {
consumer = session.createDurableSubscriber((Topic) destination, getName());
} else {
consumer = session.createConsumer(destination);
}
while (running && received < messageCount) {
Message msg = consumer.receive(receiveTimeOut);
if (msg != null) {
LOG.info(threadName + " Received " + (msg instanceof TextMessage ? ((TextMessage) msg).getText() : msg.getJMSMessageID()));
if (bytesAsText && (msg instanceof BytesMessage)) {
long length = ((BytesMessage) msg).getBodyLength();
byte[] bytes = new byte[(int) length];
((BytesMessage) msg).readBytes(bytes);
LOG.info("BytesMessage as text string: " + new String(bytes));
}
received++;
} else {
if (breakOnNull) {
break;
}
}
if (session.getTransacted()) {
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
LOG.info(threadName + " Committing transaction: " + transactions++);
session.commit();
}
} else if (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) {
if (batchSize > 0 && received > 0 && received % batchSize == 0) {
LOG.info("Acknowledging last " + batchSize + " messages; messages so far = " + received);
msg.acknowledge();
}
}
if (sleep > 0) {
Thread.sleep(sleep);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (finished != null) {
finished.countDown();
}
if (consumer != null) {
LOG.info(threadName + " Consumed: " + this.getReceived() + " messages");
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
LOG.info(threadName + " Consumer thread finished");
}