in perf/PerfConsumer.cc [155:228]
void startPerfConsumer(const Arguments& args) {
ClientConfiguration conf;
conf.setUseTls(args.isUseTls);
conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
if (!args.tlsTrustCertsFilePath.empty()) {
std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
conf.setConnectionsPerBroker(args.connectionsPerBroker);
conf.setIOThreads(args.ioThreads);
conf.setMessageListenerThreads(args.listenerThreads);
if (!args.authPlugin.empty()) {
AuthenticationPtr auth = AuthFactory::create(args.authPlugin, args.authParams);
conf.setAuth(auth);
}
Client client(args.serviceURL, conf);
ConsumerConfiguration consumerConf;
consumerConf.setMessageListener(messageListener);
consumerConf.setReceiverQueueSize(args.receiverQueueSize);
std::shared_ptr<EncKeyReader> keyReader = std::make_shared<EncKeyReader>(args.encKeyValueFile);
if (!args.encKeyName.empty()) {
consumerConf.setCryptoKeyReader(keyReader);
}
Latch latch(args.numTopics * args.numConsumers);
for (int i = 0; i < args.numTopics; i++) {
std::string topic = (args.numTopics == 1) ? args.topic : args.topic + "-" + std::to_string(i);
LOG_INFO("Adding " << args.numConsumers << " consumers on topic " << topic);
for (int j = 0; j < args.numConsumers; j++) {
std::string subscriberName;
if (args.numConsumers > 1) {
subscriberName = args.subscriberName + "-" + std::to_string(j);
} else {
subscriberName = args.subscriberName;
}
client.subscribeAsync(
topic, subscriberName, consumerConf,
std::bind(handleSubscribe, std::placeholders::_1, std::placeholders::_2, latch));
}
}
Clock::time_point oldTime = Clock::now();
latch.wait();
LOG_INFO("Start receiving from " << args.numConsumers << " consumers on " << args.numTopics << " topics");
while (true) {
std::this_thread::sleep_for(seconds(10));
Clock::time_point now = Clock::now();
double elapsed = duration_cast<milliseconds>(now - oldTime).count() / 1e3;
double rate = messagesReceived.exchange(0) / elapsed;
double throughput = bytesReceived.exchange(0) / elapsed / 1024 / 1024 * 8;
Lock lock(mutex);
int64_t e2eLatencyAvgMs = rate > 0.0 ? mean(e2eLatencyAccumulator) : 0;
int64_t e2eLatency99pctMs = p_square_quantile(e2eLatencyAccumulator);
e2eLatencyAccumulator = LatencyAccumulator(quantile_probability = 0.99);
lock.unlock();
LOG_INFO("Throughput received: " << rate << " msg/s --- " << throughput << " Mbit/s ---" //
<< " End-To-End latency: avg: " << e2eLatencyAvgMs
<< " ms -- 99pct: " << e2eLatency99pctMs << " ms");
oldTime = now;
}
}