in pulsar-testclient/src/main/java/org/apache/pulsar/proxy/socket/client/PerformanceClient.java [205:394]
public void runPerformanceTest() throws InterruptedException, IOException {
// Read payload data from file if needed
final byte[] payloadBytes = new byte[this.msgSize];
Random random = new Random(0);
List<byte[]> payloadByteList = new ArrayList<>();
if (this.payloadFilename != null) {
Path payloadFilePath = Paths.get(this.payloadFilename);
if (Files.notExists(payloadFilePath) || Files.size(payloadFilePath) == 0) {
throw new IllegalArgumentException("Payload file doesn't exist or it is empty.");
}
// here escaping the default payload delimiter to correct value
String delimiter = this.payloadDelimiter.equals("\\n") ? "\n" : this.payloadDelimiter;
String[] payloadList = new String(Files.readAllBytes(payloadFilePath), StandardCharsets.UTF_8)
.split(delimiter);
log.info("Reading payloads from {} and {} records read", payloadFilePath.toAbsolutePath(),
payloadList.length);
for (String payload : payloadList) {
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
}
if (this.formatPayload) {
messageFormatter = getMessageFormatter(this.formatterClass);
}
} else {
for (int i = 0; i < payloadBytes.length; ++i) {
// The value of the payloadBytes is from A to Z and the ASCII of A-Z is 65-90, so it needs to add 65
payloadBytes[i] = (byte) (random.nextInt(26) + 65);
}
}
ExecutorService executor = Executors.newCachedThreadPool(
new DefaultThreadFactory("pulsar-perf-producer-exec"));
HashMap<String, Tuple> producersMap = new HashMap<>();
String topicName = this.topics.get(0);
String restPath = TopicName.get(topicName).getRestPath();
String produceBaseEndPoint = TopicName.get(topicName).isV2()
? this.proxyURL + "ws/v2/producer/" + restPath : this.proxyURL + "ws/producer/" + restPath;
for (int i = 0; i < this.numTopics; i++) {
String topic = this.numTopics > 1 ? produceBaseEndPoint + i : produceBaseEndPoint;
URI produceUri = URI.create(topic);
WebSocketClient produceClient = new WebSocketClient(new SslContextFactory(true));
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
if (StringUtils.isNotBlank(this.authPluginClassName) && StringUtils.isNotBlank(this.authParams)) {
try {
Authentication auth = AuthenticationFactory.create(this.authPluginClassName,
this.authParams);
auth.start();
AuthenticationDataProvider authData = auth.getAuthData(produceUri.getHost());
if (authData.hasDataForHttp()) {
for (Map.Entry<String, String> kv : authData.getHttpHeaders()) {
produceRequest.setHeader(kv.getKey(), kv.getValue());
}
}
} catch (Exception e) {
log.error("Authentication plugin error: " + e.getMessage());
}
}
SimpleTestProducerSocket produceSocket = new SimpleTestProducerSocket();
try {
produceClient.start();
produceClient.connect(produceSocket, produceUri, produceRequest);
} catch (IOException e1) {
log.error("Fail in connecting: [{}]", e1.getMessage());
return;
} catch (Exception e1) {
log.error("Fail in starting client[{}]", e1.getMessage());
return;
}
producersMap.put(produceUri.toString(), new Tuple(produceClient, produceRequest, produceSocket));
}
// connection to be established
TimeUnit.SECONDS.sleep(5);
executor.submit(() -> {
try {
RateLimiter rateLimiter = RateLimiter.create(this.msgRate);
long startTime = System.nanoTime();
long testEndTime = startTime + (long) (this.testTime * 1e9);
// Send messages on all topics/producers
long totalSent = 0;
while (true) {
for (String topic : producersMap.keySet()) {
if (this.testTime > 0 && System.nanoTime() > testEndTime) {
log.info("------------- DONE (reached the maximum duration: [{} seconds] of production) "
+ "--------------", this.testTime);
PerfClientUtils.exit(0);
}
if (this.numMessages > 0) {
if (totalSent >= this.numMessages) {
log.trace("------------- DONE (reached the maximum number: [{}] of production) "
+ "--------------", this.numMessages);
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
}
rateLimiter.acquire();
if (producersMap.get(topic).getSocket().getSession() == null) {
Thread.sleep(10000);
PerfClientUtils.exit(0);
}
byte[] payloadData;
if (this.payloadFilename != null) {
if (messageFormatter != null) {
payloadData = messageFormatter.formatMessage("", totalSent,
payloadByteList.get(random.nextInt(payloadByteList.size())));
} else {
payloadData = payloadByteList.get(random.nextInt(payloadByteList.size()));
}
} else {
payloadData = payloadBytes;
}
producersMap.get(topic).getSocket().sendMsg(String.valueOf(totalSent++), payloadData);
messagesSent.increment();
bytesSent.add(payloadData.length);
totalMessagesSent.increment();
totalBytesSent.add(payloadData.length);
}
}
} catch (Throwable t) {
log.error(t.getMessage());
PerfClientUtils.exit(0);
}
});
// Print report stats
long oldTime = System.nanoTime();
Histogram reportHistogram = null;
String statsFileName = "perf-websocket-producer-" + System.currentTimeMillis() + ".hgrm";
log.info("Dumping latency stats to {} \n", statsFileName);
PrintStream histogramLog = new PrintStream(new FileOutputStream(statsFileName), false);
HistogramLogWriter histogramLogWriter = new HistogramLogWriter(histogramLog);
// Some log header bits
histogramLogWriter.outputLogFormatVersion();
histogramLogWriter.outputLegend();
while (true) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
break;
}
long now = System.nanoTime();
double elapsed = (now - oldTime) / 1e9;
long total = totalMessagesSent.sum();
double rate = messagesSent.sumThenReset() / elapsed;
double throughput = bytesSent.sumThenReset() / elapsed / 1024 / 1024 * 8;
reportHistogram = SimpleTestProducerSocket.recorder.getIntervalHistogram(reportHistogram);
log.info(
"Throughput produced: {} msg --- {} msg/s --- {} Mbit/s --- Latency: mean: {} ms - med: {} ms "
+ "- 95pct: {} ms - 99pct: {} ms - 99.9pct: {} ms - 99.99pct: {} ms",
INTFORMAT.format(total),
THROUGHPUTFORMAT.format(rate),
THROUGHPUTFORMAT.format(throughput),
DEC.format(reportHistogram.getMean() / 1000.0),
DEC.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
DEC.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
DEC.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
DEC.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
DEC.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0));
histogramLogWriter.outputIntervalHistogram(reportHistogram);
reportHistogram.reset();
oldTime = now;
}
TimeUnit.SECONDS.sleep(100);
executor.shutdown();
}