private void runTest()

in tools/src/main/java/org/apache/qpid/tools/StressTestClient.java [185:555]


    private void runTest(Map<String,String> options)
    {
        String jndiProperties = options.get(JNDI_PROPERTIES_ARG);
        int numConnections = Integer.parseInt(options.get(CONNECTIONS_ARG));
        int numSessions = Integer.parseInt(options.get(SESSIONS_ARG));
        int numProducers = Integer.parseInt(options.get(PRODUCERS_ARG));
        int numConsumers = Integer.parseInt(options.get(CONSUMERS_ARG));
        boolean closeConsumers = Boolean.valueOf(options.get(CLOSE_CONSUMERS_ARG));
        boolean disableMessageTimestamp = Boolean.valueOf(options.get(DISABLE_MESSAGE_TIMESTAMP_ARG));
        boolean disableMessageID = Boolean.valueOf(options.get(DISABLE_MESSAGE_ID_ARG));
        boolean consumeImmediately = Boolean.valueOf(options.get(CONSUME_IMMEDIATELY_ARG));
        int numMessage = Integer.parseInt(options.get(MESSAGE_COUNT_ARG));
        int messageSize = Integer.parseInt(options.get(MESSAGE_SIZE_ARG));
        int repetitions = Integer.parseInt(options.get(REPETITIONS_ARG));
        String destinationString = options.get(JNDI_DESTINATION_ARG);
        String connectionFactoryString = options.get(JNDI_CONNECTION_FACTORY_ARG);
        int deliveryMode = Boolean.valueOf(options.get(PERSISTENT_ARG)) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT;
        boolean random = Boolean.valueOf(options.get(RANDOM_ARG));
        long recieveTimeout = Long.parseLong(options.get(TIMEOUT_ARG));
        long delayClose = Long.parseLong(options.get(DELAYCLOSE_ARG));
        int reportingMod = Integer.parseInt(options.get(REPORT_MOD_ARG));
        boolean transacted = Boolean.valueOf(options.get(TRANSACTED_ARG));
        int txBatch = Integer.parseInt(options.get(TX_BATCH_ARG));
        boolean closeSession = Boolean.valueOf(options.get(CLOSE_SESSION_ARG));
        boolean pauseAfterConnectionOpen = Boolean.valueOf(options.get(PAUSE_AFTER_CONNECTION_OPEN_ARG));
        boolean pauseBeforeConnectionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_CONNECTION_CLOSE_ARG));
        boolean closeProducers = Boolean.valueOf(options.get(CLOSE_PRODUCERS_ARG));
        boolean pauseAfterSessionCreate = Boolean.valueOf(options.get(PAUSE_AFTER_SESSION_CREATE_ARG));
        boolean pauseBeforeSessionClose = Boolean.valueOf(options.get(PAUSE_BEFORE_SESSION_CLOSE_ARG));
        int sessionIterations = Integer.parseInt(options.get(SESSION_ITERATIONS_ARG));
        boolean pauseBeforeMessaging = Boolean.valueOf(options.get(PAUSE_BEFORE_MESSAGING_ARG));
        boolean pauseAfterMessaging = Boolean.valueOf(options.get(PAUSE_AFTER_MESSAGING_ARG));
        int messagingIterations = Integer.parseInt(options.get(MESSAGING_ITERATIONS_ARG));
        String consumerSelector =  options.get(CONSUMER_SELECTOR);
        int consumerMessageCount = !"".equals(options.get(CONSUMER_MESSAGE_COUNT)) ?
                Integer.parseInt(options.get(CONSUMER_MESSAGE_COUNT)) : numMessage;

        System.out.println(CLASS + ": Using options: " + options);

        System.out.println(CLASS + ": Creating message payload of " + messageSize + " (bytes)");
        byte[] sentBytes = generateMessage(random, messageSize);

        try
        {
            // Load JNDI properties
            Context ctx = getInitialContext(jndiProperties);
            final ConnectionFactory conFac = (ConnectionFactory) ctx.lookup(connectionFactoryString);

            //ensure the queue to be used exists and is bound
            Destination destination = (Destination) ctx.lookup(destinationString);

            System.out.println(CLASS + ": Destination: " + destination);

            Connection startupConn = null;
            try
            {
                startupConn = conFac.createConnection();
                Session startupSess = startupConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                MessageConsumer startupConsumer = startupSess.createConsumer(destination);
                startupConsumer.close();
                startupSess.close();
            }
            finally
            {
                if (startupConn != null)
                {
                    startupConn.close();
                }
            }

            for(int rep = 1 ; rep <= repetitions; rep++)
            {
                List<Connection> connectionList = new ArrayList<>();

                for (int co= 1; co<= numConnections ; co++)
                {
                    if( co % reportingMod == 0)
                    {
                        System.out.println(CLASS + ": Creating connection " + co);
                    }
                    Connection conn = conFac.createConnection();
                    conn.setExceptionListener(jmse ->
                                              {
                                                  System.err.println(CLASS + ": The sample received an exception through the ExceptionListener");
                                                  jmse.printStackTrace();
                                                  System.exit(0);
                                              });

                    connectionList.add(conn);
                    conn.start();

                    if (pauseAfterConnectionOpen)
                    {
                        System.out.println(String.format("Connection %d is open. Press any key to continue...", co));
                        System.in.read();
                    }

                    for (int se= 1; se<= numSessions ; se++)
                    {
                        if( se % reportingMod == 0)
                        {
                            System.out.println(CLASS + ": Creating Session " + se);
                        }
                        try
                        {
                            Session sess;
                            if(transacted)
                            {
                                sess = conn.createSession(true, Session.SESSION_TRANSACTED);
                            }
                            else
                            {
                                sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                            }

                            if (pauseAfterSessionCreate)
                            {
                                System.out.println(String.format(
                                        "Session %d is created on connection %d. Press any key to continue...",
                                        se,
                                        co));
                                System.in.read();
                            }

                            final Message message;
                            if (messageSize > 0)
                            {
                                message = sess.createBytesMessage();
                                ((BytesMessage) message).writeBytes(sentBytes);
                            }
                            else
                            {
                                message = sess.createMessage();
                            }


                            if(!random && numMessage == 1 && numSessions == 1 && numConnections == 1 && repetitions == 1)
                            {
                                //null the array to save memory
                                sentBytes = null;
                            }

                            for (int sessionIteration = 1; sessionIteration <= sessionIterations; sessionIteration++)
                            {
                                if (sessionIterations > 1 && sessionIteration % reportingMod == 0)
                                {
                                    System.out.println(CLASS + ": Session iteration " + sessionIteration);
                                }

                                MessageConsumer consumer = null;
                                MessageConsumer[] consumers = new MessageConsumer[numConsumers];
                                for (int cns = 1; cns <= numConsumers; cns++)
                                {
                                    if (cns % reportingMod == 0)
                                    {
                                        System.out.println(CLASS + ": Creating Consumer " + cns);
                                    }
                                    consumer = sess.createConsumer(destination, consumerSelector);
                                    consumers[cns - 1] = consumer;
                                }

                                MessageProducer[] producers = new MessageProducer[numProducers];
                                for (int pr = 1; pr <= numProducers; pr++)
                                {
                                    if (pr % reportingMod == 0)
                                    {
                                        System.out.println(CLASS + ": Creating Producer " + pr);
                                    }
                                    producers[pr - 1] = sess.createProducer(destination);

                                    if (disableMessageID)
                                    {
                                        producers[pr - 1].setDisableMessageID(true);
                                    }

                                    if (disableMessageTimestamp)
                                    {
                                        producers[pr - 1].setDisableMessageTimestamp(true);
                                    }
                                }

                                if (pauseBeforeMessaging)
                                {
                                    System.out.println("Consumer(s)/Producer(s) created. Press any key to continue...");
                                    System.in.read();
                                }

                                for (int iteration = 1; iteration <= messagingIterations; iteration++)
                                {
                                    if (messagingIterations > 1 && iteration % reportingMod == 0)
                                    {
                                        System.out.println(CLASS + ": Iteration " + iteration);
                                    }

                                    for (int pr = 1; pr <= numProducers; pr++)
                                    {
                                        MessageProducer prod = producers[pr - 1];
                                        for (int me = 1; me <= numMessage; me++)
                                        {
                                            int messageNumber = (iteration - 1) * numProducers * numMessage
                                                                + (pr - 1) * numMessage + (me - 1);
                                            if (messageNumber % reportingMod == 0)
                                            {
                                                System.out.println(CLASS + ": Sending Message " + messageNumber);
                                            }
                                            message.setIntProperty("index", me - 1);
                                            prod.send(message, deliveryMode,
                                                      Message.DEFAULT_PRIORITY,
                                                      Message.DEFAULT_TIME_TO_LIVE);
                                            if (sess.getTransacted() && me % txBatch == 0)
                                            {
                                                sess.commit();
                                            }
                                        }
                                    }

                                    if (numConsumers == 1 && consumeImmediately)
                                    {
                                        for (int cs = 1; cs <= consumerMessageCount; cs++)
                                        {
                                            if (cs % reportingMod == 0)
                                            {
                                                System.out.println(CLASS + ": Consuming Message " + cs);
                                            }
                                            Message msg = null;
                                            if (consumer != null)
                                            {
                                                msg = consumer.receive(recieveTimeout);
                                            }

                                            if (sess.getTransacted() && cs % txBatch == 0)
                                            {
                                                sess.commit();
                                            }

                                            if (msg == null)
                                            {
                                                throw new RuntimeException(
                                                        "Expected message not received in allowed time: "
                                                        + recieveTimeout);
                                            }

                                            if (messageSize > 0)
                                            {
                                                validateReceivedMessageContent(sentBytes,
                                                                               (BytesMessage) msg, random, messageSize);
                                            }
                                        }
                                    }
                                }

                                if (pauseAfterMessaging)
                                {
                                    System.out.println("Messaging operations are completed. Press any key to continue...");
                                    System.in.read();
                                }

                                if (closeProducers)
                                {
                                    for (MessageProducer messageProducer : producers)
                                    {
                                        messageProducer.close();
                                    }
                                }

                                if (closeConsumers)
                                {
                                    for (MessageConsumer messageConsumer : consumers)
                                    {
                                        messageConsumer.close();
                                    }
                                }

                            }

                            if (pauseBeforeSessionClose)
                            {
                                System.out.println(String.format(
                                        "Session %d on connection %d is about to be closed. Press any key to continue...",
                                        se,
                                        co));
                                System.in.read();
                            }

                            if (closeSession)
                            {
                                sess.close();
                            }
                        }
                        catch (Exception exp)
                        {
                            System.err.println(CLASS + ": Caught an Exception: " + exp);
                            exp.printStackTrace();
                        }

                    }
                }

                if(numConsumers == -1 && !consumeImmediately)
                {
                    System.out.println(CLASS + ": Consuming left over messages, using receive timeout:" + recieveTimeout);

                    Connection conn = conFac.createConnection();
                    Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    MessageConsumer consumer = sess.createConsumer(destination);
                    conn.start();

                    int count = 0;
                    while(true)
                    {
                        Message msg = consumer.receive(recieveTimeout);

                        if(msg == null)
                        {
                            System.out.println(CLASS + ": Received " + count + " messages");
                            break;
                        }
                        else
                        {
                            count++;
                        }

                        if (messageSize > 0)
                        {
                            validateReceivedMessageContent(sentBytes, (BytesMessage) msg, random, messageSize);
                        }
                    }

                    consumer.close();
                    sess.close();
                    conn.close();
                }

                if(delayClose > 0)
                {
                    System.out.println(CLASS + ": Delaying closing connections: " + delayClose);
                    Thread.sleep(delayClose);
                }

                // Close the connections to the server
                System.out.println(CLASS + ": Closing connections");

                for(int connection = 0 ; connection < connectionList.size() ; connection++)
                {
                    if( (connection+1) % reportingMod == 0)
                    {
                        System.out.println(CLASS + ": Closing connection " + (connection+1));
                    }
                    Connection c = connectionList.get(connection);

                    if (pauseBeforeConnectionClose)
                    {
                        System.out.println(String.format(
                                "Connection %d is about to be closed. Press any key to continue...",
                                connection));
                        System.in.read();
                    }
                    c.close();
                }

                // Close the JNDI reference
                System.out.println(CLASS + ": Closing JNDI context");
                ctx.close();
            }
        }
        catch (Exception exp)
        {
            System.err.println(CLASS + ": Caught an Exception: " + exp);
            exp.printStackTrace();
        }
    }