tools/rpc_replay/rpc_replay.cpp (251 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/time.h>
#include <butil/macros.h>
#include <butil/file_util.h>
#include <bvar/bvar.h>
#include <bthread/bthread.h>
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/rpc_dump.h>
#include <brpc/serialized_request.h>
#include <brpc/nshead_message.h>
#include <brpc/details/http_message.h>
#include "brpc/options.pb.h"
#include "info_thread.h"
DEFINE_string(dir, "", "The directory of dumped requests");
DEFINE_int32(times, 1, "Repeat replaying for so many times");
DEFINE_int32(qps, 0, "Limit QPS if this flag is positive");
DEFINE_int32(thread_num, 0, "Number of threads for replaying");
DEFINE_bool(use_bthread, true, "Use bthread to replay");
DEFINE_string(connection_type, "", "Connection type, choose automatically "
"according to protocol by default");
DEFINE_string(server, "0.0.0.0:8002", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Maximum retry times");
DEFINE_int32(dummy_port, 8899, "Port of dummy server(to monitor replaying)");
DEFINE_string(http_host, "", "Host field for http protocol");
bvar::LatencyRecorder g_latency_recorder("rpc_replay");
bvar::Adder<int64_t> g_error_count("rpc_replay_error_count");
bvar::Adder<int64_t> g_sent_count;
// Include channels for all protocols that support both client and server.
class ChannelGroup {
public:
int Init();
~ChannelGroup();
// Get channel by protocol type.
brpc::Channel* channel(brpc::ProtocolType type) {
if ((size_t)type < _chans.size()) {
return _chans[(size_t)type];
}
return NULL;
}
private:
std::vector<brpc::Channel*> _chans;
};
int ChannelGroup::Init() {
{
// force global initialization of rpc.
brpc::Channel dummy_channel;
}
std::vector<std::pair<brpc::ProtocolType, brpc::Protocol> > protocols;
brpc::ListProtocols(&protocols);
size_t max_protocol_size = 0;
for (size_t i = 0; i < protocols.size(); ++i) {
max_protocol_size = std::max(max_protocol_size,
(size_t)protocols[i].first);
}
_chans.resize(max_protocol_size + 1);
for (size_t i = 0; i < protocols.size(); ++i) {
const brpc::ProtocolType protocol_type = protocols[i].first;
const brpc::Protocol protocol = protocols[i].second;
brpc::ChannelOptions options;
options.protocol = protocol_type;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if ((options.connection_type == brpc::CONNECTION_TYPE_UNKNOWN ||
options.connection_type & protocol.supported_connection_type) &&
protocol.support_client() &&
protocol.support_server()) {
brpc::Channel* chan = new brpc::Channel;
if (chan->Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(),
&options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
delete chan;
return -1;
}
_chans[protocol_type] = chan;
}
}
return 0;
}
ChannelGroup::~ChannelGroup() {
for (size_t i = 0; i < _chans.size(); ++i) {
delete _chans[i];
}
_chans.clear();
}
static void handle_response(brpc::Controller* cntl, int64_t start_time,
bool sleep_on_error/*note*/) {
// TODO(gejun): some bthreads are starved when new bthreads are created
// continuously, which happens when server is down and RPC keeps failing.
// Sleep a while on error to avoid that now.
const int64_t end_time = butil::gettimeofday_us();
const int64_t elp = end_time - start_time;
if (!cntl->Failed()) {
g_latency_recorder << elp;
} else {
g_error_count << 1;
if (sleep_on_error) {
bthread_usleep(10000);
}
}
delete cntl;
}
butil::atomic<int> g_thread_offset(0);
static void* replay_thread(void* arg) {
ChannelGroup* chan_group = static_cast<ChannelGroup*>(arg);
const int thread_offset = g_thread_offset.fetch_add(1, butil::memory_order_relaxed);
double req_rate = FLAGS_qps / (double)FLAGS_thread_num;
brpc::SerializedRequest req;
brpc::NsheadMessage nshead_req;
int64_t last_expected_time = butil::monotonic_time_ns();
const int64_t interval = (int64_t) (1000000000L / req_rate);
// the max tolerant delay between end_time and expected_time. 10ms or 10 intervals
int64_t max_tolerant_delay = std::max((int64_t) 10000000L, 10 * interval);
for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) {
brpc::SampleIterator it(FLAGS_dir);
int j = 0;
for (brpc::SampledRequest* sample = it.Next();
!brpc::IsAskedToQuit() && sample != NULL; sample = it.Next(), ++j) {
std::unique_ptr<brpc::SampledRequest> sample_guard(sample);
if ((j % FLAGS_thread_num) != thread_offset) {
continue;
}
brpc::Channel* chan =
chan_group->channel(sample->meta.protocol_type());
if (chan == NULL) {
LOG(ERROR) << "No channel on protocol="
<< sample->meta.protocol_type();
continue;
}
brpc::Controller* cntl = new brpc::Controller;
req.Clear();
google::protobuf::Message* req_ptr = &req;
cntl->reset_sampled_request(sample_guard.release());
if (sample->meta.protocol_type() == brpc::PROTOCOL_HTTP) {
brpc::HttpMessage http_message;
http_message.ParseFromIOBuf(sample->request);
cntl->http_request().Swap(http_message.header());
if (!FLAGS_http_host.empty()) {
// reset Host in header
cntl->http_request().SetHeader("Host", FLAGS_http_host);
}
cntl->request_attachment() = http_message.body().movable();
req_ptr = NULL;
} else if (sample->meta.protocol_type() == brpc::PROTOCOL_NSHEAD) {
nshead_req.Clear();
memcpy(&nshead_req.head, sample->meta.nshead().c_str(), sample->meta.nshead().length());
nshead_req.body = sample->request;
req_ptr = &nshead_req;
} else if (sample->meta.attachment_size() > 0) {
sample->request.cutn(
&req.serialized_data(),
sample->request.size() - sample->meta.attachment_size());
cntl->request_attachment() = sample->request.movable();
} else {
req.serialized_data() = sample->request.movable();
}
g_sent_count << 1;
const int64_t start_time = butil::gettimeofday_us();
if (FLAGS_qps <= 0) {
chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
cntl, req_ptr, NULL/*ignore response*/, NULL);
handle_response(cntl, start_time, true);
} else {
google::protobuf::Closure* done =
brpc::NewCallback(handle_response, cntl, start_time, false);
chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
cntl, req_ptr, NULL/*ignore response*/, done);
int64_t end_time = butil::monotonic_time_ns();
int64_t expected_time = last_expected_time + interval;
if (end_time < expected_time) {
usleep((expected_time - end_time)/1000);
}
if (end_time - expected_time > max_tolerant_delay) {
expected_time = end_time;
}
last_expected_time = expected_time;
}
}
}
return NULL;
}
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_dir.empty() ||
!butil::DirectoryExists(butil::FilePath(FLAGS_dir))) {
LOG(ERROR) << "--dir=<dir-of-dumped-files> is required";
return -1;
}
if (FLAGS_dummy_port >= 0) {
brpc::StartDummyServerAt(FLAGS_dummy_port);
}
ChannelGroup chan_group;
if (chan_group.Init() != 0) {
LOG(ERROR) << "Fail to init ChannelGroup";
return -1;
}
if (FLAGS_thread_num <= 0) {
if (FLAGS_qps <= 0) { // unlimited qps
FLAGS_thread_num = 50;
} else {
FLAGS_thread_num = FLAGS_qps / 10000;
if (FLAGS_thread_num < 1) {
FLAGS_thread_num = 1;
}
if (FLAGS_thread_num > 50) {
FLAGS_thread_num = 50;
}
}
}
const int rate_limit_per_thread = 1000000;
int req_rate_per_thread = FLAGS_qps / FLAGS_thread_num;
if (req_rate_per_thread > rate_limit_per_thread) {
LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is "
<< rate_limit_per_thread << " in one thread";
return -1;
}
std::vector<bthread_t> bids;
std::vector<pthread_t> pids;
if (!FLAGS_use_bthread) {
pids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (pthread_create(&pids[i], NULL, replay_thread, &chan_group) != 0) {
LOG(ERROR) << "Fail to create pthread";
return -1;
}
}
} else {
bids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (bthread_start_background(
&bids[i], NULL, replay_thread, &chan_group) != 0) {
LOG(ERROR) << "Fail to create bthread";
return -1;
}
}
}
brpc::InfoThread info_thr;
brpc::InfoThreadOptions info_thr_opt;
info_thr_opt.latency_recorder = &g_latency_recorder;
info_thr_opt.error_count = &g_error_count;
info_thr_opt.sent_count = &g_sent_count;
if (!info_thr.start(info_thr_opt)) {
LOG(ERROR) << "Fail to create info_thread";
return -1;
}
for (int i = 0; i < FLAGS_thread_num; ++i) {
if (!FLAGS_use_bthread) {
pthread_join(pids[i], NULL);
} else {
bthread_join(bids[i], NULL);
}
}
info_thr.stop();
return 0;
}