example/common.h (192 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_ #define ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_ #include <atomic> #include <chrono> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> #ifndef WIN32 #include "unistd.h" #endif #include "Arg_helper.h" #include "DefaultMQProducer.h" #include "DefaultMQPullConsumer.h" #include "DefaultMQPushConsumer.h" using namespace std; std::atomic<int> g_msgCount(1); class RocketmqSendAndConsumerArgs { public: RocketmqSendAndConsumerArgs() : body("msgbody for test"), thread_count(std::thread::hardware_concurrency()), broadcasting(false), syncpush(false), SelectUnactiveBroker(false), IsAutoDeleteSendCallback(false), retrytimes(5), PrintMoreInfo(false) {} public: std::string namesrv; std::string namesrv_domain; std::string groupname; std::string topic; std::string body; int thread_count; bool broadcasting; bool syncpush; bool SelectUnactiveBroker; // default select active broker bool IsAutoDeleteSendCallback; int retrytimes; // default retry 5 times; bool PrintMoreInfo; }; class TpsReportService { public: TpsReportService() : tps_interval_(1), quit_flag_(false), tps_count_(0) {} ~TpsReportService() { quit_flag_.store(true); if (tps_thread_ == nullptr) { std::cout << "tps_thread_ is null" << std::endl; return; } if (tps_thread_->joinable()) { tps_thread_->join(); } } void start() { if (tps_thread_ != nullptr) { std::cout << "tps_thread_ is not null" << std::endl; return; } tps_thread_.reset(new std::thread(std::bind(&TpsReportService::TpsReport, this))); } void Increment() { ++tps_count_; } void TpsReport() { while (!quit_flag_.load()) { std::this_thread::sleep_for(tps_interval_); std::cout << "tps: " << tps_count_.load() << std::endl; tps_count_.store(0); } } private: std::chrono::seconds tps_interval_; std::shared_ptr<std::thread> tps_thread_; std::atomic<bool> quit_flag_; std::atomic<long> tps_count_; }; void PrintPullResult(rocketmq::PullResult* result) { std::cout << result->toString() << std::endl; if (result->pullStatus == rocketmq::FOUND) { std::cout << result->toString() << endl; std::vector<rocketmq::MQMessageExt>::iterator it = result->msgFoundList.begin(); for (; it != result->msgFoundList.end(); ++it) { cout << "=======================================================" << endl << (*it).toString() << endl; } } } static void PrintRocketmqSendAndConsumerArgs(const RocketmqSendAndConsumerArgs& info) { std::cout << "nameserver: " << info.namesrv << endl << "topic: " << info.topic << endl << "groupname: " << info.groupname << endl << "produce content: " << info.body << endl << "msg count: " << g_msgCount.load() << endl << "thread count: " << info.thread_count << endl; } static void help() { std::cout << "need option,like follow: \n" << "-n nameserver addr, if not set -n and -i ,no nameSrv will be got \n" "-i nameserver domain name, if not set -n and -i ,no nameSrv will be " "got \n" "-g groupname \n" "-t msg topic \n" "-m messagecout(default value: 1) \n" "-c content(default value: only test ) \n" "-b (BROADCASTING model, default value: CLUSTER) \n" "-s sync push(default is async push)\n" "-r setup retry times(default value: 5 times)\n" "-u select active broker to send msg(default value: false)\n" "-d use AutoDeleteSendcallback by cpp client(defalut value: false) \n" "-T thread count of send msg or consume msg(defalut value: system cpu " "core number) \n" "-v print more details information \n"; } static bool ParseArgs(int argc, char* argv[], RocketmqSendAndConsumerArgs* info) { #ifndef WIN32 int ch; while ((ch = getopt(argc, argv, "n:i:g:t:m:c:b:s:h:r:T:bu")) != -1) { switch (ch) { case 'n': info->namesrv.insert(0, optarg); break; case 'i': info->namesrv_domain.insert(0, optarg); break; case 'g': info->groupname.insert(0, optarg); break; case 't': info->topic.insert(0, optarg); break; case 'm': g_msgCount.store(atoi(optarg)); break; case 'c': info->body.insert(0, optarg); break; case 'b': info->broadcasting = true; break; case 's': info->syncpush = true; break; case 'r': info->retrytimes = atoi(optarg); break; case 'u': info->SelectUnactiveBroker = true; break; case 'T': info->thread_count = atoi(optarg); break; case 'v': info->PrintMoreInfo = true; break; case 'h': help(); return false; default: help(); return false; } } #else rocketmq::Arg_helper arg_help(argc, argv); info->namesrv = arg_help.get_option_value("-n"); info->namesrv_domain = arg_help.get_option_value("-i"); info->groupname = arg_help.get_option_value("-g"); info->topic = arg_help.get_option_value("-t"); info->broadcasting = atoi(arg_help.get_option_value("-b").c_str()); string msgContent(arg_help.get_option_value("-c")); if (!msgContent.empty()) info->body = msgContent; info->syncpush = atoi(arg_help.get_option_value("-s").c_str()); int retrytimes = atoi(arg_help.get_option_value("-r").c_str()); if (retrytimes > 0) info->retrytimes = retrytimes; info->SelectUnactiveBroker = atoi(arg_help.get_option_value("-u").c_str()); int thread_count = atoi(arg_help.get_option_value("-T").c_str()); if (thread_count > 0) info->thread_count = thread_count; info->PrintMoreInfo = atoi(arg_help.get_option_value("-v").c_str()); g_msgCount = atoi(arg_help.get_option_value("-m").c_str()); #endif if (info->groupname.empty() || info->topic.empty() || (info->namesrv_domain.empty() && info->namesrv.empty())) { std::cout << "please use -g to setup groupname and -t setup topic \n"; help(); return false; } return true; } #endif // ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_