in perf/PerfConsumer.cc [230:348]
int main(int argc, char** argv) {
// First try to read default values from config file if present
const std::string confFile = "conf/client.conf";
std::string defaultServiceUrl;
std::ifstream file(confFile.c_str());
if (file) {
po::variables_map vm;
po::options_description confFileDesc;
confFileDesc.add_options() //
("serviceURL", po::value<std::string>()->default_value("pulsar://localhost:6650"));
po::store(po::parse_config_file<char>(file, confFileDesc, true), vm);
po::notify(vm);
defaultServiceUrl = vm["serviceURL"].as<std::string>();
}
Arguments args;
// Declare the supported options.
po::positional_options_description positional;
positional.add("topic", 1);
po::options_description desc("Allowed options");
desc.add_options() //
("help,h", "Print this help message") //
("auth-params,v", po::value<std::string>(&args.authParams)->default_value(""),
"Authentication parameters, e.g., \"key1:val1,key2:val2\"") //
("auth-plugin,a", po::value<std::string>(&args.authPlugin)->default_value(""),
"Authentication plugin class library path") //
("use-tls,b", po::value<bool>(&args.isUseTls)->default_value(false),
"Whether tls connection is used") //
("allow-insecure,d", po::value<bool>(&args.isTlsAllowInsecureConnection)->default_value(true),
"Whether insecure tls connection is allowed") //
("trust-cert-file,c", po::value<std::string>(&args.tlsTrustCertsFilePath)->default_value(""),
"TLS trust certification file path") //
("num-topics,t", po::value<int>(&args.numTopics)->default_value(1), "Number of topics") //
("num-consumers,n", po::value<int>(&args.numConsumers)->default_value(1),
"Number of consumers (per topic)") //
("subscriber-name,s", po::value<std::string>(&args.subscriberName)->default_value("sub"),
"Subscriber name prefix") //
("wait-time,w", po::value<int>(&args.waitTimeMs)->default_value(1),
"Simulate a slow message consumer (Delay in ms)") //
("service-url,u", po::value<std::string>(&args.serviceURL)->default_value(defaultServiceUrl),
"Pulsar Service URL") //
("receiver-queue-size,p", po::value<int>(&args.receiverQueueSize)->default_value(1000),
"Size of the receiver queue") //
("io-threads,i", po::value<int>(&args.ioThreads)->default_value(1),
"Number of IO threads to use") //
("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1),
"Number of listener threads") //
("connections-per-broker", po::value<int>(&args.connectionsPerBroker)->default_value(1),
"Number of connections per each broker") //
("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""),
"The private key name to decrypt payload") //
("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""),
"The file which contains the private key to decrypt payload"); //
po::options_description hidden;
hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name");
po::options_description allOptions;
allOptions.add(desc).add(hidden);
po::variables_map map;
try {
po::store(po::command_line_parser(argc, argv).options(allOptions).positional(positional).run(), map);
po::notify(map);
} catch (const std::exception& e) {
std::cerr << "Error parsing parameters -- " << e.what() << std::endl << std::endl;
std::cerr << desc << std::endl;
return -1;
}
if (map.count("help")) {
std::cerr << desc << std::endl;
return -1;
}
if (map.count("topic") != 1) {
std::cerr << "Need to specify a topic name. eg: persistent://prop/cluster/ns/my-topic" << std::endl
<< std::endl;
std::cerr << desc << std::endl;
return -1;
}
LOG_INFO("--- Consumer configuration ---");
for (po::variables_map::iterator it = map.begin(); it != map.end(); ++it) {
if (it->second.value().type() == typeid(std::string)) {
LOG_INFO(it->first << ": " << it->second.as<std::string>());
} else if (it->second.value().type() == typeid(int)) {
LOG_INFO(it->first << ": " << it->second.as<int>());
} else if (it->second.value().type() == typeid(double)) {
LOG_INFO(it->first << ": " << it->second.as<double>());
}
}
LOG_INFO("------------------------------");
startPerfConsumer(args);
}