public synchronized MessageListenerBenchmark start()

in artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/perf/MessageListenerBenchmark.java [148:267]


   public synchronized MessageListenerBenchmark start() {
      if (started) {
         return this;
      }
      started = true;
      closed = false;
      final AtomicLong consumerId = new AtomicLong(1);
      // setup connection failure listeners
      final AtomicBoolean signalBrokenConnection = new AtomicBoolean(false);
      fatalException = signalBrokenConnection;
      // create connections upfront and register failure listener
      final Connection[] jmsConnections = new Connection[connections];
      for (int i = 0; i < connections; i++) {
         final Connection connection;
         try {
            connection = factory.createConnection();
            if (clientID != null) {
               if (connections > 1) {
                  connection.setClientID(clientID + i);
               } else {
                  connection.setClientID(clientID);
               }
            }
            connection.setExceptionListener(ignore -> {
               signalBrokenConnection.set(true);
            });
            jmsConnections[i] = connection;
         } catch (JMSException e) {
            throw new RuntimeException(e);
         }
      }
      this.jmsConnections.addAll(Arrays.asList(jmsConnections));
      // start connections
      this.jmsConnections.forEach(connection -> {
         try {
            connection.start();
         } catch (JMSException e) {
            throw new RuntimeException(e);
         }
      });
      int connectionSequence = 0;
      final int totalListeners = consumers * destinations.length * Math.max(sharedSubscription, 1);
      this.listeners = new ArrayList<>(totalListeners);
      if (messageCount > 0) {
         msgCountLimiter = new MessageCountLimiter().setMessageLimit(messageCount);
      } else if (canDelaySetMessageCount) {
         msgCountLimiter = new MessageCountLimiter();
      }
      // create consumers per destination
      if (durableSubscription) {
         silentUnsubscribe = new ArrayList<>();
      }
      for (int i = 0; i < destinations.length; i++) {
         final Destination destination = destinations[i];
         if (sharedSubscription == 0) {
            final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(consumers);
            createListeners(destinationListeners, consumerId, destination, consumers);
            listeners.addAll(destinationListeners);
            try {
               for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
                  final Connection connection = jmsConnections[connectionSequence % connections];
                  connectionSequence++;
                  final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
                  final MessageConsumer consumer;
                  if (durableSubscription) {
                     final Topic topic = (Topic) destination;
                     consumer = session.createDurableConsumer((Topic) destination, topic.getTopicName() + consumerIndex);
                  } else {
                     consumer = session.createConsumer(destination);
                  }
                  consumer.setMessageListener(destinationListeners.remove());
               }
            } catch (JMSException e) {
               throw new RuntimeException(e);
            }
         } else {
            final int listenersPerDestination = sharedSubscription * consumers;
            final Queue<RecordingMessageListener> destinationListeners = new ArrayDeque<>(listenersPerDestination);
            createListeners(destinationListeners, consumerId, destination, listenersPerDestination);
            listeners.addAll(destinationListeners);
            try {
               final String topicName = ((Topic) destination).getTopicName();
               for (int subscriptionIndex = 0; subscriptionIndex < sharedSubscription; subscriptionIndex++) {
                  Connection connection = null;
                  if (clientID != null) {
                     connection = jmsConnections[connectionSequence % connections];
                     assert connection.getClientID() != null;
                     connectionSequence++;
                  }
                  for (int consumerIndex = 0; consumerIndex < consumers; consumerIndex++) {
                     if (clientID == null) {
                        assert connection == null;
                        connection = jmsConnections[connectionSequence % connections];
                        connectionSequence++;
                     }
                     final Session session = connection.createSession(transaction ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
                     final MessageConsumer consumer;
                     if (durableSubscription) {
                        final String subscriptionName = topicName + subscriptionIndex;
                        consumer = session.createSharedDurableConsumer((Topic) destination, subscriptionName);
                        silentUnsubscribe.add(() -> {
                           try {
                              session.unsubscribe(subscriptionName);
                           } catch (JMSException e) {
                              throw new RuntimeException(e);
                           }
                        });
                     } else {
                        consumer = session.createSharedConsumer((Topic) destination, topicName + subscriptionIndex);
                     }
                     consumer.setMessageListener(destinationListeners.remove());
                  }
               }
            } catch (JMSException fatal) {
               throw new RuntimeException(fatal);
            }
         }
      }
      return this;
   }