example/baidu_proxy_and_generic_call/proxy.cpp (98 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// todo
// A proxy to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <butil/strings/string_number_conversions.h>
#include <brpc/server.h>
#include <brpc/controller.h>
#include <brpc/channel.h>
#include <json2pb/pb_to_json.h>
DEFINE_int32(port, 8000, "TCP Port of this server");
DEFINE_string(listen_addr, "", "Server listen address, may be IPV4/IPV6/UDS."
" If this is set, the flag port will be ignored");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short");
DEFINE_string(server_address, "0.0.0.0:8001", "IP Address of server");
DEFINE_string(load_balancer, "", "The algorithm for load balancing");
DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds");
DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)");
DEFINE_int32(interval_ms, 1000, "Milliseconds between consecutive requests");
// Your implementation of example::EchoService
// Notice that implementing brpc::Describable grants the ability to put
// additional information in /status.
namespace example {
class BaiduMasterServiceImpl : public brpc::BaiduMasterService {
public:
void ProcessRpcRequest(brpc::Controller* cntl,
const brpc::SerializedRequest* request,
brpc::SerializedResponse* response,
::google::protobuf::Closure* done) override {
// This object helps you to call done->Run() in RAII style. If you need
// to process the request asynchronously, pass done_guard.release().
brpc::ClosureGuard done_guard(done);
// A Channel represents a communication line to a Server. Notice that
// Channel is thread-safe and can be shared by all threads in your program.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
options.connection_type = FLAGS_connection_type;
options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/;
options.max_retry = FLAGS_max_retry;
if (channel.Init(FLAGS_server_address.c_str(),
FLAGS_load_balancer.c_str(), &options) != 0) {
LOG(ERROR) << "Fail to initialize channel";
(*cntl->response_user_fields())["x-bd-proxy-error-code"] =
butil::IntToString(brpc::EINTERNAL);
(*cntl->response_user_fields())["x-bd-proxy-error-text"] =
"Fail to initialize channel";
return;
}
LOG(INFO) << "Received request[log_id=" << cntl->log_id()
<< "] from " << cntl->remote_side()
<< " to " << cntl->local_side()
<< ", serialized request size=" << request->serialized_data().size()
<< ", request compress type=" << cntl->request_compress_type()
<< " (attached=" << cntl->request_attachment() << ")";
brpc::Controller call_cntl;
call_cntl.set_log_id(cntl->log_id());
call_cntl.request_attachment().swap(cntl->request_attachment());
call_cntl.set_request_compress_type(cntl->request_compress_type());
call_cntl.reset_sampled_request(cntl->release_sampled_request());
// It is ok to use request and response for sync rpc.
channel.CallMethod(NULL, &call_cntl, request, response, NULL);
(*cntl->response_user_fields())["x-bd-proxy-error-code"] =
butil::IntToString(call_cntl.ErrorCode());
if (call_cntl.Failed()) {
(*cntl->response_user_fields())["x-bd-proxy-error-text"] =
call_cntl.ErrorText();
LOG(ERROR) << "Fail to call service=" << call_cntl.sampled_request()->meta.service_name()
<< ", method=" << call_cntl.sampled_request()->meta.method_name()
<< ", error_code=" << call_cntl.ErrorCode()
<< ", error_text=" << call_cntl.ErrorCode();
return;
} else {
LOG(INFO) << "Received response from " << call_cntl.remote_side()
<< " to " << call_cntl.local_side()
<< ", serialized response size=" << response->serialized_data().size()
<< ", response compress type=" << call_cntl.response_compress_type()
<< ", attached=" << call_cntl.response_attachment()
<< ", latency=" << call_cntl.latency_us() << "us";
}
cntl->response_attachment().swap(call_cntl.response_attachment());
cntl->set_response_compress_type(call_cntl.response_compress_type());
}
};
} // namespace example
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
// Generally you only need one Server.
brpc::Server server;
butil::EndPoint point;
if (!FLAGS_listen_addr.empty()) {
if (butil::str2endpoint(FLAGS_listen_addr.c_str(), &point) < 0) {
LOG(ERROR) << "Invalid listen address:" << FLAGS_listen_addr;
return -1;
}
} else {
point = butil::EndPoint(butil::IP_ANY, FLAGS_port);
}
// Start the server.
brpc::ServerOptions options;
// Add the baidu master service into server.
// Notice new operator, because server will delete it in dtor of Server.
options.baidu_master_service = new example::BaiduMasterServiceImpl();
options.idle_timeout_sec = FLAGS_idle_timeout_s;
if (server.Start(point, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}