void startPerfConsumer()

in perf/PerfConsumer.cc [161:233]


void startPerfConsumer(const Arguments& args) {
    ClientConfiguration conf;

    conf.setUseTls(args.isUseTls);
    conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
    if (!args.tlsTrustCertsFilePath.empty()) {
        std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
        conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
    }
    conf.setIOThreads(args.ioThreads);
    conf.setMessageListenerThreads(args.listenerThreads);
    if (!args.authPlugin.empty()) {
        AuthenticationPtr auth = AuthFactory::create(args.authPlugin, args.authParams);
        conf.setAuth(auth);
    }

    Client client(pulsar::PulsarFriend::getClient(args.serviceURL, conf, args.poolConnections));

    ConsumerConfiguration consumerConf;
    consumerConf.setMessageListener(messageListener);
    consumerConf.setReceiverQueueSize(args.receiverQueueSize);
    std::shared_ptr<EncKeyReader> keyReader = std::make_shared<EncKeyReader>(args.encKeyValueFile);
    if (!args.encKeyName.empty()) {
        consumerConf.setCryptoKeyReader(keyReader);
    }

    Latch latch(args.numTopics * args.numConsumers);

    for (int i = 0; i < args.numTopics; i++) {
        std::string topic = (args.numTopics == 1) ? args.topic : args.topic + "-" + std::to_string(i);
        LOG_INFO("Adding " << args.numConsumers << " consumers on topic " << topic);

        for (int j = 0; j < args.numConsumers; j++) {
            std::string subscriberName;
            if (args.numConsumers > 1) {
                subscriberName = args.subscriberName + "-" + std::to_string(j);
            } else {
                subscriberName = args.subscriberName;
            }

            client.subscribeAsync(
                topic, subscriberName, consumerConf,
                std::bind(handleSubscribe, std::placeholders::_1, std::placeholders::_2, latch));
        }
    }

    Clock::time_point oldTime = Clock::now();

    latch.wait();
    LOG_INFO("Start receiving from " << args.numConsumers << " consumers on " << args.numTopics << " topics");

    while (true) {
        std::this_thread::sleep_for(seconds(10));

        Clock::time_point now = Clock::now();
        double elapsed = duration_cast<milliseconds>(now - oldTime).count() / 1e3;

        double rate = messagesReceived.exchange(0) / elapsed;
        double throughput = bytesReceived.exchange(0) / elapsed / 1024 / 1024 * 8;

        Lock lock(mutex);
        int64_t e2eLatencyAvgMs = rate > 0.0 ? mean(e2eLatencyAccumulator) : 0;
        int64_t e2eLatency99pctMs = p_square_quantile(e2eLatencyAccumulator);
        e2eLatencyAccumulator = LatencyAccumulator(quantile_probability = 0.99);
        lock.unlock();

        LOG_INFO("Throughput received: " << rate << "  msg/s --- " << throughput << " Mbit/s ---"  //
                                         << " End-To-End latency: avg: " << e2eLatencyAvgMs
                                         << " ms -- 99pct: " << e2eLatency99pctMs << " ms");

        oldTime = now;
    }
}