public void runPerformanceTest()

in pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java [205:394]


    public void runPerformanceTest() throws InterruptedException, IOException {
        // Read payload data from file if needed
        final byte[] payloadBytes = new byte[this.msgSize];
        Random random = new Random(0);
        List<byte[]> payloadByteList = new ArrayList<>();
        if (this.payloadFilename != null) {
            Path payloadFilePath = Paths.get(this.payloadFilename);
            if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0)  {
                throw new IllegalArgumentException("Payload file doesn't exist or it is empty.");
            }
            // here escaping the default payload delimiter to correct value
            String delimiter = this.payloadDelimiter.equals("\\n") ? "\n" : this.payloadDelimiter;
            String[] payloadList = new String(Files.readAllBytes(payloadFilePath), StandardCharsets.UTF_8)
                    .split(delimiter);
            log.info("Reading payloads from {} and {} records read", payloadFilePath.toAbsolutePath(),
                    payloadList.length);
            for (String payload : payloadList) {
                payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
            }

            if (this.formatPayload) {
                messageFormatter = getMessageFormatter(this.formatterClass);
            }
        } else {
            for (int i = 0; i < payloadBytes.length; ++i) {
                // The value of the payloadBytes is from A to Z and the ASCII of A-Z is 65-90, so it needs to add 65
                payloadBytes[i] = (byte) (random.nextInt(26) + 65);
            }
        }

        ExecutorService executor = Executors.newCachedThreadPool(
                new DefaultThreadFactory("pulsar-perf-producer-exec"));
        HashMap<String, Tuple> producersMap = new HashMap<>();
        String topicName = this.topics.get(0);
        String restPath = TopicName.get(topicName).getRestPath();
        String produceBaseEndPoint = TopicName.get(topicName).isV2()
                ? this.proxyURL + "ws/v2/producer/" + restPath : this.proxyURL + "ws/producer/" + restPath;
        for (int i = 0; i < this.numTopics; i++) {
            String topic = this.numTopics > 1 ? produceBaseEndPoint + i : produceBaseEndPoint;
            URI produceUri = URI.create(topic);

            WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();

            if (StringUtils.isNotBlank(this.authPluginClassName) && StringUtils.isNotBlank(this.authParams)) {
                try {
                    Authentication auth = AuthenticationFactory.create(this.authPluginClassName,
                            this.authParams);
                    auth.start();
                    AuthenticationDataProvider authData = auth.getAuthData(produceUri.getHost());
                    if (authData.hasDataForHttp()) {
                        for (Map.Entry<String, String> kv : authData.getHttpHeaders()) {
                            produceRequest.setHeader(kv.getKey(), kv.getValue());
                        }
                    }
                } catch (Exception e) {
                    log.error("Authentication plugin error: " + e.getMessage());
                }
            }

            SimpleTestProducerSocket produceSocket = new SimpleTestProducerSocket();

            try {
                produceClient.start();
                produceClient.connect(produceSocket, produceUri, produceRequest);
            } catch (IOException e1) {
                log.error("Fail in connecting: [{}]", e1.getMessage());
                return;
            } catch (Exception e1) {
                log.error("Fail in starting client[{}]", e1.getMessage());
                return;
            }

            producersMap.put(produceUri.toString(), new Tuple(produceClient, produceRequest, produceSocket));
        }

        // connection to be established
        TimeUnit.SECONDS.sleep(5);

        executor.submit(() -> {
            try {
                RateLimiter rateLimiter = RateLimiter.create(this.msgRate);
                long startTime = System.nanoTime();
                long testEndTime = startTime + (long) (this.testTime * 1e9);
                // Send messages on all topics/producers
                long totalSent = 0;
                while (true) {
                    for (String topic : producersMap.keySet()) {
                        if (this.testTime > 0 && System.nanoTime() > testEndTime) {
                            log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) "
                                    + "--------------", this.testTime);
                            PerfClientUtils.exit(0);
                        }

                        if (this.numMessages > 0) {
                            if (totalSent >= this.numMessages) {
                                log.trace("------------- DONE (reached the maximum number: [{}] of production) "
                                        + "--------------", this.numMessages);
                                Thread.sleep(10000);
                                PerfClientUtils.exit(0);
                            }
                        }

                        rateLimiter.acquire();

                        if (producersMap.get(topic).getSocket().getSession() == null) {
                            Thread.sleep(10000);
                            PerfClientUtils.exit(0);
                        }

                        byte[] payloadData;
                        if (this.payloadFilename != null) {
                            if (messageFormatter != null) {
                                payloadData = messageFormatter.formatMessage("", totalSent,
                                        payloadByteList.get(random.nextInt(payloadByteList.size())));
                            } else {
                                payloadData = payloadByteList.get(random.nextInt(payloadByteList.size()));
                            }
                        } else {
                            payloadData = payloadBytes;
                        }
                        producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), payloadData);
                        messagesSent.increment();
                        bytesSent.add(payloadData.length);
                        totalMessagesSent.increment();
                        totalBytesSent.add(payloadData.length);
                    }
                }

            } catch (Throwable t) {
                log.error(t.getMessage());
                PerfClientUtils.exit(0);
            }
        });

        // Print report stats
        long oldTime = System.nanoTime();

        Histogram reportHistogram = null;

        String statsFileName = "perf-websocket-producer-" + System.currentTimeMillis() + ".hgrm";
        log.info("Dumping latency stats to {} \n", statsFileName);

        PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
        HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);

        // Some log header bits
        histogramLogWriter.outputLogFormatVersion();
        histogramLogWriter.outputLegend();

        while (true) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                break;
            }

            long now = System.nanoTime();
            double elapsed = (now - oldTime) / 1e9;

            long total = totalMessagesSent.sum();
            double rate = messagesSent.sumThenReset() / elapsed;
            double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;

            reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram(reportHistogram);

            log.info(
                    "Throughput produced: {} msg --- {}  msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} ms "
                            + "- 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms",
                    INTFORMAT.format(total),
                    THROUGHPUTFORMAT.format(rate),
                    THROUGHPUTFORMAT.format(throughput),
                    DEC.format(reportHistogram.getMean() / 1000.0),
                    DEC.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
                    DEC.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
                    DEC.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
                    DEC.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
                    DEC.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0));

            histogramLogWriter.outputIntervalHistogram(reportHistogram);
            reportHistogram.reset();

            oldTime = now;
        }

        TimeUnit.SECONDS.sleep(100);

        executor.shutdown();

    }