int main()

in perf/PerfProducer.cc [189:425]


int main(int argc, char** argv) {
    std::string defaultServiceUrl;

    // First try to read default values from config file if present
    const std::string confFile = "conf/client.conf";

    std::ifstream file(confFile.c_str());
    if (file) {
        po::variables_map vm;
        po::options_description confFileDesc;
        confFileDesc.add_options()  //
            ("serviceURL", po::value<std::string>()->default_value("pulsar://localhost:6650"));

        po::store(po::parse_config_file<char>(file, confFileDesc, true), vm);
        po::notify(vm);

        defaultServiceUrl = vm["serviceURL"].as<std::string>();
    }

    Arguments args;

    // Declare the supported options.
    po::positional_options_description positional;
    positional.add("topic", 1);

    po::options_description desc("Allowed options");
    desc.add_options()                         //
        ("help,h", "Print this help message")  //

        ("memory-limit,ml", po::value<int>(&args.memoryLimitMb)->default_value(64), "Memory limit (MB)")  //

        ("auth-params,v", po::value<std::string>(&args.authParams)->default_value(""),
         "Authentication parameters, e.g., \"key1:val1,key2:val2\"")  //

        ("auth-plugin,a", po::value<std::string>(&args.authPlugin)->default_value(""),
         "Authentication plugin class library path")  //

        ("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
         "Whether tls connection is used")  //

        ("allow-insecure,d", po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
         "Whether insecure tls connection is allowed")  //

        ("trust-cert-file,c", po::value<std::string>(&args.tlsTrustCertsFilePath)->default_value(""),
         "TLS trust certification file path")  //

        ("rate,r", po::value<double>(&args.rate)->default_value(100.0),
         "Publish rate msg/s across topics")                                            //
        ("size,s", po::value<int>(&args.msgSize)->default_value(1024), "Message size")  //

        ("num-topics,t", po::value<int>(&args.numTopics)->default_value(1), "Number of topics")  //

        ("num-producers,n", po::value<int>(&args.numProducers)->default_value(1),
         "Number of producers (per topic)")  //

        ("num-threads-per-producers", po::value<int>(&args.numOfThreadsPerProducer)->default_value(1),
         "Number of threads (per producer)")  //

        ("service-url,u", po::value<std::string>(&args.serviceURL)->default_value(defaultServiceUrl),
         "Pulsar Service URL")  //

        ("producer-queue-size,p", po::value<int>(&args.producerQueueSize)->default_value(1000),
         "Max size of producer pending messages queue")  //

        ("io-threads,i", po::value<int>(&args.ioThreads)->default_value(1),
         "Number of IO threads to use")  //

        ("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
         "Number of listener threads")  //

        ("sampling-period", po::value<long>(&args.samplingPeriod)->default_value(20),
         "Time elapsed in seconds before reading are aggregated. Default: 20 sec")  //

        ("num-of-samples", po::value<long>(&args.numberOfSamples)->default_value(0),
         "Number of samples to take. Default: 0 (run forever)")  //

        ("batch-size", po::value<unsigned int>(&args.batchingMaxMessages)->default_value(1),
         "If batch size == 1 then batching is disabled. Default batch size == 1")  //

        ("compression", po::value<std::string>(&args.compression)->default_value(""),
         "Compression can be either 'zlib' or 'lz4'. Default is no compression")  //

        ("max-batch-size-in-bytes",
         po::value<long>(&args.batchingMaxAllowedSizeInBytes)->default_value(128 * 1024),
         "Use only is batch-size > 1, Default is 128 KB")  //

        ("max-batch-publish-delay-in-ms",
         po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
         "Use only is batch-size > 1, Default is 3 seconds")  //

        ("connections-per-broker", po::value<int>(&args.connectionsPerBroker)->default_value(1),
         "Number of connections per each broker")  //

        ("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""),
         "The public key name to encrypt payload")  //

        ("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
         "The file which contains the public key to encrypt payload");  //

    po::options_description hidden;
    hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name");

    po::options_description allOptions;
    allOptions.add(desc).add(hidden);

    po::variables_map map;
    try {
        po::store(po::command_line_parser(argc, argv).options(allOptions).positional(positional).run(), map);
        po::notify(map);
    } catch (const std::exception& e) {
        std::cerr << "Error parsing parameters -- " << e.what() << std::endl << std::endl;
        std::cerr << desc << std::endl;
        return -1;
    }

    if (map.count("help")) {
        std::cerr << desc << std::endl;
        return -1;
    }

    if (map.count("topic") != 1) {
        std::cerr << "Need to specify a topic name. eg: persistent://prop/cluster/ns/my-topic" << std::endl
                  << std::endl;
        std::cerr << desc << std::endl;
        return -1;
    }

    LOG_INFO("--- Producer configuration ---");
    for (po::variables_map::iterator it = map.begin(); it != map.end(); ++it) {
        if (it->second.value().type() == typeid(std::string)) {
            LOG_INFO(it->first << ": " << it->second.as<std::string>());
        } else if (it->second.value().type() == typeid(bool)) {
            LOG_INFO(it->first << ": " << it->second.as<bool>());
        } else if (it->second.value().type() == typeid(int)) {
            LOG_INFO(it->first << ": " << it->second.as<int>());
        } else if (it->second.value().type() == typeid(double)) {
            LOG_INFO(it->first << ": " << it->second.as<double>());
        } else if (it->second.value().type() == typeid(long)) {
            LOG_INFO(it->first << ": " << it->second.as<long>());
        } else if (it->second.value().type() == typeid(unsigned int)) {
            LOG_INFO(it->first << ": " << it->second.as<unsigned int>());
        } else {
            LOG_INFO(it->first << ": "
                               << "new data type used, please create an else condition in the code");
        }
    }

    LOG_INFO("------------------------------");
    pulsar::ProducerConfiguration producerConf;
    producerConf.setMaxPendingMessages(args.producerQueueSize);
    if (args.batchingMaxMessages > 1) {
        producerConf.setBatchingEnabled(true);
        producerConf.setBatchingMaxMessages(args.batchingMaxMessages);
        producerConf.setBatchingMaxAllowedSizeInBytes(args.batchingMaxAllowedSizeInBytes);
        producerConf.setBatchingMaxPublishDelayMs(args.batchingMaxPublishDelayMs);
    }

    if (args.compression == "zlib") {
        producerConf.setCompressionType(CompressionZLib);
    } else if (args.compression == "lz4") {
        producerConf.setCompressionType(CompressionLZ4);
    } else if (!args.compression.empty()) {
        LOG_WARN("Invalid compression type: " << args.compression);
        return -1;
    }

    // Block if queue is full else we will start seeing errors in sendAsync
    producerConf.setBlockIfQueueFull(true);
    std::shared_ptr<EncKeyReader> keyReader = std::make_shared<EncKeyReader>(args.encKeyValueFile);
    if (!args.encKeyName.empty()) {
        producerConf.addEncryptionKey(args.encKeyName);
        producerConf.setCryptoKeyReader(keyReader);
    }

    // Enable round robin message routing if it is a partitioned topic
    producerConf.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);

    pulsar::ClientConfiguration conf;
    conf.setConnectionsPerBroker(args.connectionsPerBroker);
    conf.setMemoryLimit(args.memoryLimitMb * 1024 * 1024);
    conf.setUseTls(args.isUseTls);
    conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
    if (!args.tlsTrustCertsFilePath.empty()) {
        std::string tlsTrustCertsFilePath(args.tlsTrustCertsFilePath);
        conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
    }
    conf.setIOThreads(args.ioThreads);
    conf.setMessageListenerThreads(args.listenerThreads);
    if (!args.authPlugin.empty()) {
        pulsar::AuthenticationPtr auth = pulsar::AuthFactory::create(args.authPlugin, args.authParams);
        conf.setAuth(auth);
    }

    pulsar::Client client(args.serviceURL, conf);

    std::atomic<bool> exitCondition(false);
    startPerfProducer(args, producerConf, client, exitCondition);

    Clock::time_point oldTime = Clock::now();
    unsigned long totalMessagesProduced = 0;
    long messagesToSend = args.numberOfSamples;
    while (args.numberOfSamples == 0 || --messagesToSend > 0) {
        std::this_thread::sleep_for(std::chrono::seconds(args.samplingPeriod));

        Clock::time_point now = Clock::now();
        double elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(now - oldTime).count() / 1e3;

        Lock lock(mutex);
        double rate = messagesProduced / elapsed;
        double throughput = bytesProduced / elapsed / 1024 / 1024 * 8;
        totalMessagesProduced += messagesProduced;
        messagesProduced = 0;
        bytesProduced = 0;

        double latencyAvgMs = mean(e2eLatencyAccumulator) / 1000.0;
        double latency99pctMs = p_square_quantile(e2eLatencyAccumulator) / 1000.0;
        e2eLatencyAccumulator = LatencyAccumulator(quantile_probability = 0.99);
        lock.unlock();

        LOG_INFO("Throughput produced: " << rate << "  msg/s --- " << throughput << " Mbit/s --- "  //
                                         << "Lat avg: " << latencyAvgMs
                                         << " ms -- Lat 99pct: " << latency99pctMs << " ms");
        oldTime = now;
    }
    LOG_INFO("Total messagesProduced = " << totalMessagesProduced + messagesProduced);
    exitCondition = true;
    for (auto& thread : threadList) {
        thread.join();
    }
    // Waiting for the sendCallbacks To Complete
    std::this_thread::sleep_for(std::chrono::seconds(2));
    for (int i = 0; i < producerList.size(); i++) {
        producerList[i].close();
    }
    // Waiting for 2 seconds
    std::this_thread::sleep_for(std::chrono::seconds(2));
}