example/session_data_and_thread_local/server.cpp (165 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// A server to receive EchoRequest and send back EchoResponse asynchronously.
#include <gflags/gflags.h>
#include <butil/logging.h>
#include <brpc/server.h>
#include "echo.pb.h"
DEFINE_bool(echo_attachment, true, "Echo attachment as well");
DEFINE_int32(port, 8002, "TCP Port of this server");
DEFINE_int32(idle_timeout_s, -1, "Connection will be closed if there is no "
"read/write operations during the last `idle_timeout_s'");
DEFINE_int32(max_concurrency, 0, "Limit of request processing in parallel");
butil::atomic<int> nsd(0);
struct MySessionLocalData {
MySessionLocalData() : x(123) {
nsd.fetch_add(1, butil::memory_order_relaxed);
}
~MySessionLocalData() {
nsd.fetch_sub(1, butil::memory_order_relaxed);
}
int x;
};
class MySessionLocalDataFactory : public brpc::DataFactory {
public:
void* CreateData() const {
return new MySessionLocalData;
}
void DestroyData(void* d) const {
delete static_cast<MySessionLocalData*>(d);
}
};
butil::atomic<int> ntls(0);
struct MyThreadLocalData {
MyThreadLocalData() : y(0) {
ntls.fetch_add(1, butil::memory_order_relaxed);
}
~MyThreadLocalData() {
ntls.fetch_sub(1, butil::memory_order_relaxed);
}
static void deleter(void* d) {
delete static_cast<MyThreadLocalData*>(d);
}
int y;
};
class MyThreadLocalDataFactory : public brpc::DataFactory {
public:
void* CreateData() const {
return new MyThreadLocalData;
}
void DestroyData(void* d) const {
MyThreadLocalData::deleter(d);
}
};
struct AsyncJob {
MySessionLocalData* expected_session_local_data;
int expected_session_value;
brpc::Controller* cntl;
const example::EchoRequest* request;
example::EchoResponse* response;
google::protobuf::Closure* done;
void run();
void run_and_delete() {
run();
delete this;
}
};
static void* process_thread(void* args) {
AsyncJob* job = static_cast<AsyncJob*>(args);
job->run_and_delete();
return NULL;
}
// Your implementation of example::EchoService
class EchoServiceWithThreadAndSessionLocal : public example::EchoService {
public:
EchoServiceWithThreadAndSessionLocal() {
CHECK_EQ(0, bthread_key_create(&_tls2_key, MyThreadLocalData::deleter));
}
~EchoServiceWithThreadAndSessionLocal() {
CHECK_EQ(0, bthread_key_delete(_tls2_key));
};
void Echo(google::protobuf::RpcController* cntl_base,
const example::EchoRequest* request,
example::EchoResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
// Get the session-local data which is created by ServerOptions.session_local_data_factory
// and reused between different RPC. All session-local data are
// destroyed upon server destruction.
MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data());
if (sd == NULL) {
cntl->SetFailed("Require ServerOptions.session_local_data_factory to be"
" set with a correctly implemented instance");
LOG(ERROR) << cntl->ErrorText();
return;
}
const int expected_value = sd->x + (((uintptr_t)cntl) & 0xFFFFFFFF);
sd->x = expected_value;
// Get the thread-local data which is created by ServerOptions.thread_local_data_factory
// and reused between different threads. All thread-local data are
// destroyed upon server destruction.
// "tls" is short for "thread local storage".
MyThreadLocalData* tls =
static_cast<MyThreadLocalData*>(brpc::thread_local_data());
if (tls == NULL) {
cntl->SetFailed("Require ServerOptions.thread_local_data_factory "
"to be set with a correctly implemented instance");
LOG(ERROR) << cntl->ErrorText();
return;
}
tls->y = expected_value;
// You can create bthread-local data for your own.
// The interfaces are similar with pthread equivalence:
// pthread_key_create -> bthread_key_create
// pthread_key_delete -> bthread_key_delete
// pthread_getspecific -> bthread_getspecific
// pthread_setspecific -> bthread_setspecific
MyThreadLocalData* tls2 =
static_cast<MyThreadLocalData*>(bthread_getspecific(_tls2_key));
if (tls2 == NULL) {
tls2 = new MyThreadLocalData;
CHECK_EQ(0, bthread_setspecific(_tls2_key, tls2));
}
tls2->y = expected_value + 1;
// sleep awhile to force context switching.
bthread_usleep(10000);
// tls is unchanged after context switching.
CHECK_EQ(tls, brpc::thread_local_data());
CHECK_EQ(expected_value, tls->y);
CHECK_EQ(tls2, bthread_getspecific(_tls2_key));
CHECK_EQ(expected_value + 1, tls2->y);
// Process the request asynchronously.
AsyncJob* job = new AsyncJob;
job->expected_session_local_data = sd;
job->expected_session_value = expected_value;
job->cntl = cntl;
job->request = request;
job->response = response;
job->done = done;
bthread_t th;
CHECK_EQ(0, bthread_start_background(&th, NULL, process_thread, job));
// We don't want to call done->Run() here, release the guard.
done_guard.release();
LOG_EVERY_SECOND(INFO) << "ntls=" << ntls.load(butil::memory_order_relaxed)
<< " nsd=" << nsd.load(butil::memory_order_relaxed);
}
private:
bthread_key_t _tls2_key;
};
void AsyncJob::run() {
brpc::ClosureGuard done_guard(done);
// Sleep some time to make sure that Echo() exits.
bthread_usleep(10000);
// Still the session-local data that we saw in Echo().
// This is the major difference between session-local data and thread-local
// data which was already destroyed upon Echo() exit.
MySessionLocalData* sd = static_cast<MySessionLocalData*>(cntl->session_local_data());
CHECK_EQ(expected_session_local_data, sd);
CHECK_EQ(expected_session_value, sd->x);
// Echo request and its attachment
response->set_message(request->message());
if (FLAGS_echo_attachment) {
cntl->response_attachment().append(cntl->request_attachment());
}
}
int main(int argc, char* argv[]) {
// Parse gflags. We recommend you to use gflags as well.
GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true);
// The factory to create MySessionLocalData. Must be valid when server is running.
MySessionLocalDataFactory session_local_data_factory;
MyThreadLocalDataFactory thread_local_data_factory;
// Generally you only need one Server.
brpc::Server server;
// For more options see `brpc/server.h'.
brpc::ServerOptions options;
options.idle_timeout_sec = FLAGS_idle_timeout_s;
options.max_concurrency = FLAGS_max_concurrency;
options.session_local_data_factory = &session_local_data_factory;
options.thread_local_data_factory = &thread_local_data_factory;
// Instance of your service.
EchoServiceWithThreadAndSessionLocal echo_service_impl;
// Add the service into server. Notice the second parameter, because the
// service is put on stack, we don't want server to delete it, otherwise
// use brpc::SERVER_OWNS_SERVICE.
if (server.AddService(&echo_service_impl,
brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
LOG(ERROR) << "Fail to add service";
return -1;
}
// Start the server.
if (server.Start(FLAGS_port, &options) != 0) {
LOG(ERROR) << "Fail to start EchoServer";
return -1;
}
// Wait until Ctrl-C is pressed, then Stop() and Join() the server.
server.RunUntilAskedToQuit();
CHECK_EQ(ntls, 0);
CHECK_EQ(nsd, 0);
return 0;
}