tools/parallel_http/parallel_http.cpp (175 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Access many http servers in parallel, much faster than curl (even called in batch)
#include <gflags/gflags.h>
#include <deque>
#include <bthread/bthread.h>
#include <butil/logging.h>
#include <butil/files/scoped_file.h>
#include <brpc/channel.h>
DEFINE_string(url_file, "", "The file containing urls to fetch. If this flag is"
" empty, read urls from stdin");
DEFINE_int32(timeout_ms, 1000, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(thread_num, 8, "Number of threads to access urls");
DEFINE_int32(concurrency, 1000, "Max number of http calls in parallel");
DEFINE_bool(one_line_mode, false, "Output as `URL HTTP-RESPONSE' on true");
DEFINE_bool(only_show_host, false, "Print host name only");
struct AccessThreadArgs {
const std::deque<std::string>* url_list;
size_t offset;
std::deque<std::pair<std::string, butil::IOBuf> > output_queue;
butil::Mutex output_queue_mutex;
butil::atomic<int> current_concurrency;
};
class OnHttpCallEnd : public google::protobuf::Closure {
public:
void Run();
public:
brpc::Controller cntl;
AccessThreadArgs* args;
std::string url;
};
void OnHttpCallEnd::Run() {
std::unique_ptr<OnHttpCallEnd> delete_self(this);
{
BAIDU_SCOPED_LOCK(args->output_queue_mutex);
if (cntl.Failed()) {
args->output_queue.push_back(std::make_pair(url, butil::IOBuf()));
} else {
args->output_queue.push_back(
std::make_pair(url, cntl.response_attachment()));
}
}
args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
}
void* access_thread(void* void_args) {
AccessThreadArgs* args = (AccessThreadArgs*)void_args;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
options.connect_timeout_ms = FLAGS_timeout_ms / 2;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
const int concurrency_for_this_thread = FLAGS_concurrency / FLAGS_thread_num;
for (size_t i = args->offset; i < args->url_list->size(); i += FLAGS_thread_num) {
std::string const& url = (*args->url_list)[i];
brpc::Channel channel;
if (channel.Init(url.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to create channel to url=" << url;
BAIDU_SCOPED_LOCK(args->output_queue_mutex);
args->output_queue.push_back(std::make_pair(url, butil::IOBuf()));
continue;
}
while (args->current_concurrency.fetch_add(1, butil::memory_order_relaxed)
> concurrency_for_this_thread) {
args->current_concurrency.fetch_sub(1, butil::memory_order_relaxed);
bthread_usleep(5000);
}
OnHttpCallEnd* done = new OnHttpCallEnd;
done->cntl.http_request().uri() = url;
done->args = args;
done->url = url;
channel.CallMethod(NULL, &done->cntl, NULL, NULL, done);
}
return NULL;
}
int main(int argc, char** argv) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
// if (FLAGS_path.empty() || FLAGS_path[0] != '/') {
// FLAGS_path = "/" + FLAGS_path;
// }
butil::ScopedFILE fp_guard;
FILE* fp = NULL;
if (!FLAGS_url_file.empty()) {
fp_guard.reset(fopen(FLAGS_url_file.c_str(), "r"));
if (!fp_guard) {
PLOG(ERROR) << "Fail to open `" << FLAGS_url_file << '\'';
return -1;
}
fp = fp_guard.get();
} else {
fp = stdin;
}
char* line_buf = NULL;
size_t line_len = 0;
ssize_t nr = 0;
std::deque<std::string> url_list;
while ((nr = getline(&line_buf, &line_len, fp)) != -1) {
if (line_buf[nr - 1] == '\n') { // remove ending newline
line_buf[nr - 1] = '\0';
--nr;
}
butil::StringPiece line(line_buf, nr);
line.trim_spaces();
if (!line.empty()) {
url_list.push_back(line.as_string());
}
}
if (url_list.empty()) {
return 0;
}
AccessThreadArgs* args = new AccessThreadArgs[FLAGS_thread_num];
for (int i = 0; i < FLAGS_thread_num; ++i) {
args[i].url_list = &url_list;
args[i].offset = i;
args[i].current_concurrency.store(0, butil::memory_order_relaxed);
}
std::vector<bthread_t> tids;
tids.resize(FLAGS_thread_num);
for (int i = 0; i < FLAGS_thread_num; ++i) {
CHECK_EQ(0, bthread_start_background(&tids[i], NULL, access_thread, &args[i]));
}
std::deque<std::pair<std::string, butil::IOBuf> > output_queue;
size_t nprinted = 0;
while (nprinted != url_list.size()) {
for (int i = 0; i < FLAGS_thread_num; ++i) {
{
BAIDU_SCOPED_LOCK(args[i].output_queue_mutex);
output_queue.swap(args[i].output_queue);
}
for (size_t i = 0; i < output_queue.size(); ++i) {
butil::StringPiece url = output_queue[i].first;
butil::StringPiece hostname;
if (url.starts_with("http://")) {
url.remove_prefix(7);
}
size_t slash_pos = url.find('/');
if (slash_pos != butil::StringPiece::npos) {
hostname = url.substr(0, slash_pos);
} else {
hostname = url;
}
if (FLAGS_one_line_mode) {
if (FLAGS_only_show_host) {
std::cout << hostname;
} else {
std::cout << "http://" << url;
}
if (output_queue[i].second.empty()) {
std::cout << " ERROR" << std::endl;
} else {
std::cout << ' ' << output_queue[i].second << std::endl;
}
} else {
// The prefix is unlikely be part of a ordinary http body,
// thus the line can be easily removed by shell utilities.
std::cout << "#### ";
if (FLAGS_only_show_host) {
std::cout << hostname;
} else {
std::cout << "http://" << url;
}
if (output_queue[i].second.empty()) {
std::cout << " ERROR" << std::endl;
} else {
std::cout << '\n' << output_queue[i].second << std::endl;
}
}
}
nprinted += output_queue.size();
output_queue.clear();
}
usleep(10000);
}
for (int i = 0; i < FLAGS_thread_num; ++i) {
bthread_join(tids[i], NULL);
}
for (int i = 0; i < FLAGS_thread_num; ++i) {
while (args[i].current_concurrency.load(butil::memory_order_relaxed) != 0) {
usleep(10000);
}
}
return 0;
}