example/coroutine/coroutine_server.cpp (96 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// A server to receive EchoRequest and send back EchoResponse.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include <brpc/channel.h>
#include <brpc/coroutine.h>
#include "echo.pb.h"
DEFINE_int32(port, 8000, "TCP Port of this server");
DEFINE_int32(sleep_us, 1000000, "Server sleep us");
DEFINE_bool(enable_coroutine, true, "Enable coroutine");
using brpc::experimental::Awaitable;
using brpc::experimental::AwaitableDone;
using brpc::experimental::Coroutine;
namespace example {
class EchoServiceImpl : public EchoService {
public:
EchoServiceImpl() {
brpc::ChannelOptions options;
options.timeout_ms = FLAGS_sleep_us / 1000 * 2 + 100;
options.max_retry = 0;
CHECK(_channel.Init(butil::EndPoint(butil::IP_ANY, FLAGS_port), &options) == 0);
}
virtual ~EchoServiceImpl() {}
void Echo(google::protobuf::RpcController* cntl_base,
const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) override {
// brpc::Controller* cntl =
// static_cast<brpc::Controller*>(cntl_base);
if (FLAGS_enable_coroutine) {
Coroutine(EchoAsync(request, response, done), true);
} else {
brpc::ClosureGuard done_guard(done);
bthread_usleep(FLAGS_sleep_us);
response->set_message(request->message());
}
}
Awaitable<void> EchoAsync(const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
co_await Coroutine::usleep(FLAGS_sleep_us);
response->set_message(request->message());
}
void Proxy(google::protobuf::RpcController* cntl_base,
const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) override {
// brpc::Controller* cntl =
// static_cast<brpc::Controller*>(cntl_base);
if (FLAGS_enable_coroutine) {
Coroutine(ProxyAsync(request, response, done), true);
} else {
brpc::ClosureGuard done_guard(done);
EchoService_Stub stub(&_channel);
brpc::Controller cntl;
stub.Echo(&cntl, request, response, NULL);
if (cntl.Failed()) {
response->set_message(cntl.ErrorText());
}
}
}
Awaitable<void> ProxyAsync(const EchoRequest* request,
EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
EchoService_Stub stub(&_channel);
brpc::Controller cntl;
AwaitableDone done2;
stub.Echo(&cntl, request, response, &done2);
co_await done2.awaitable();
if (cntl.Failed()) {
response->set_message(cntl.ErrorText());
}
}
private:
brpc::Channel _channel;
};
} // namespace example
int main(int argc, char* argv[]) {
bthread_setconcurrency(BTHREAD_MIN_CONCURRENCY);
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
if (FLAGS_enable_coroutine) {
GFLAGS_NAMESPACE::SetCommandLineOption("usercode_in_coroutine", "true");
}
// Generally you only need one Server.
brpc::Server server;
// Instance of your service.
example::EchoServiceImpl echo_service_impl;
// Add the service into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
// use brpc::SERVER_OWNS_SERVICE.
if (server.AddService(&echo_service_impl,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
// Start the server.
brpc::ServerOptions options;
options.num_threads = BTHREAD_MIN_CONCURRENCY;
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
return 0;
}