in artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/perf/MessageListenerBenchmark.java [148:267]
public synchronized MessageListenerBenchmark start() {
if (started) {
return this;
}
started = true;
closed = false;
final AtomicLong consumerId = new AtomicLong(1);
// setup connection failure listeners
final AtomicBoolean signalBrokenConnection = new AtomicBoolean(false);
fatalException = signalBrokenConnection;
// create connections upfront and register failure listener
final Connection[] jmsConnections = new Connection[connections];
for (int i = 0; i < connections; i++) {
final Connection connection;
try {
connection = factory.createConnection();
if (clientID != null) {
if (connections > 1) {
connection.setClientID(clientID + i);
} else {
connection.setClientID(clientID);
}
}
connection.setExceptionListener(ignore -> {
signalBrokenConnection.set(true);
});
jmsConnections[i] = connection;
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
this.jmsConnections.addAll(Arrays.asList(jmsConnections));
// start connections
this.jmsConnections.forEach(connection -> {
try {
connection.start();
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
int connectionSequence = 0;
final int totalListeners = consumers * destinations.length * Math.max(sharedSubscription, 1);
this.listeners = new ArrayList<>(totalListeners);
if (messageCount > 0) {
msgCountLimiter = new MessageCountLimiter().setMessageLimit(messageCount);
} else if (canDelaySetMessageCount) {
msgCountLimiter = new MessageCountLimiter();
}
// create consumers per destination
if (durableSubscription) {
silentUnsubscribe = new ArrayList<>();
}
for (int i = 0; i < destinations.length; i++) {
final Destination destination = destinations[i];
if (sharedSubscription == 0) {
final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(consumers);
createListeners(destinationListeners, consumerId, destination, consumers);
listeners.addAll(destinationListeners);
try {
for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
final Connection connection = jmsConnections[connectionSequence % connections];
connectionSequence++;
final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer;
if (durableSubscription) {
final Topic topic = (Topic) destination;
consumer = session.createDurableConsumer((Topic) destination, topic.getTopicName() + consumerIndex);
} else {
consumer = session.createConsumer(destination);
}
consumer.setMessageListener(destinationListeners.remove());
}
} catch (JMSException e) {
throw new RuntimeException(e);
}
} else {
final int listenersPerDestination = sharedSubscription * consumers;
final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(listenersPerDestination);
createListeners(destinationListeners, consumerId, destination, listenersPerDestination);
listeners.addAll(destinationListeners);
try {
final String topicName = ((Topic) destination).getTopicName();
for (int subscriptionIndex = 0; subscriptionIndex < sharedSubscription; subscriptionIndex++) {
Connection connection = null;
if (clientID != null) {
connection = jmsConnections[connectionSequence % connections];
assert connection.getClientID() != null;
connectionSequence++;
}
for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
if (clientID == null) {
assert connection == null;
connection = jmsConnections[connectionSequence % connections];
connectionSequence++;
}
final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer;
if (durableSubscription) {
final String subscriptionName = topicName + subscriptionIndex;
consumer = session.createSharedDurableConsumer((Topic) destination, subscriptionName);
silentUnsubscribe.add(() -> {
try {
session.unsubscribe(subscriptionName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
} else {
consumer = session.createSharedConsumer((Topic) destination, topicName + subscriptionIndex);
}
consumer.setMessageListener(destinationListeners.remove());
}
}
} catch (JMSException fatal) {
throw new RuntimeException(fatal);
}
}
}
return this;
}