in src/brpc/server.cpp [831:1256]
int Server::StartInternal(const butil::EndPoint& endpoint,
const PortRange& port_range,
const ServerOptions *opt) {
std::unique_ptr<Server, RevertServerStatus> revert_server(this);
if (_failed_to_set_max_concurrency_of_method) {
_failed_to_set_max_concurrency_of_method = false;
LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "
"fix it before starting server";
return -1;
}
if (_failed_to_set_ignore_eovercrowded) {
_failed_to_set_ignore_eovercrowded = false;
LOG(ERROR) << "previous call to IgnoreEovercrowdedOf() was failed, "
"fix it before starting server";
return -1;
}
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
}
const Status st = status();
if (st != READY) {
if (st == RUNNING) {
LOG(ERROR) << "Server[" << version() << "] is already running on "
<< _listen_addr;
} else {
LOG(ERROR) << "Can't start Server[" << version()
<< "] which is " << status_str(status());
}
return -1;
}
copy_and_fill_server_options(_options, opt ? *opt : ServerOptions());
if (!_options.h2_settings.IsValid(true/*log_error*/)) {
LOG(ERROR) << "Invalid h2_settings";
return -1;
}
if (_options.use_rdma) {
#if BRPC_WITH_RDMA
if (!OptionsAvailableOverRdma(&_options)) {
return -1;
}
rdma::GlobalRdmaInitializeOrDie();
#else
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
return -1;
#endif
}
if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
_options.bthread_tag >= FLAGS_task_group_ntags) {
LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
<< BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
return -1;
}
if (_options.http_master_service) {
// Check requirements for http_master_service:
// has "default_method" & request/response have no fields
const google::protobuf::ServiceDescriptor* sd =
_options.http_master_service->GetDescriptor();
const google::protobuf::MethodDescriptor* md =
sd->FindMethodByName("default_method");
if (md == NULL) {
LOG(ERROR) << "http_master_service must have a method named `default_method'";
return -1;
}
if (md->input_type()->field_count() != 0) {
LOG(ERROR) << "The request type of http_master_service must have "
"no fields, actually " << md->input_type()->field_count();
return -1;
}
if (md->output_type()->field_count() != 0) {
LOG(ERROR) << "The response type of http_master_service must have "
"no fields, actually " << md->output_type()->field_count();
return -1;
}
}
// CAUTION:
// Following code may run multiple times if this server is started and
// stopped more than once. Reuse or delete previous resources!
if (_options.session_local_data_factory) {
if (_session_local_data_pool == NULL) {
_session_local_data_pool =
new (std::nothrow) SimpleDataPool(_options.session_local_data_factory);
if (NULL == _session_local_data_pool) {
LOG(ERROR) << "Fail to new SimpleDataPool";
return -1;
}
} else {
_session_local_data_pool->Reset(_options.session_local_data_factory);
}
_session_local_data_pool->Reserve(_options.reserved_session_local_data);
}
// Leak of `_keytable_pool' and others is by design.
// See comments in Server::Join() for details.
// Instruct LeakSanitizer to ignore the designated memory leak.
ANNOTATE_SCOPED_MEMORY_LEAK;
// Init _keytable_pool always. If the server was stopped before, the pool
// should be destroyed in Join().
_keytable_pool = new bthread_keytable_pool_t;
if (bthread_keytable_pool_init(_keytable_pool) != 0) {
LOG(ERROR) << "Fail to init _keytable_pool";
delete _keytable_pool;
_keytable_pool = NULL;
return -1;
}
if (_options.thread_local_data_factory) {
_tl_options.thread_local_data_factory = _options.thread_local_data_factory;
if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
_options.thread_local_data_factory) != 0) {
LOG(ERROR) << "Fail to create thread-local key";
return -1;
}
if (_options.reserved_thread_local_data) {
bthread_keytable_pool_reserve(_keytable_pool,
_options.reserved_thread_local_data,
_tl_options.tls_key,
CreateServerTLS,
_options.thread_local_data_factory);
}
} else {
_tl_options = ThreadLocalOptions();
}
if (_options.bthread_init_count != 0 &&
_options.bthread_init_fn != NULL) {
// Create some special bthreads to call the init functions. The
// bthreads will not quit until all bthreads finish the init function.
BthreadInitArgs* init_args
= new BthreadInitArgs[_options.bthread_init_count];
size_t ncreated = 0;
for (size_t i = 0; i < _options.bthread_init_count; ++i, ++ncreated) {
init_args[i].bthread_init_fn = _options.bthread_init_fn;
init_args[i].bthread_init_args = _options.bthread_init_args;
init_args[i].result = false;
init_args[i].done = false;
init_args[i].stop = false;
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _options.bthread_tag;
tmp.keytable_pool = _keytable_pool;
if (bthread_start_background(
&init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) {
break;
}
}
// Wait until all created bthreads finish the init function.
for (size_t i = 0; i < ncreated; ++i) {
while (!init_args[i].done) {
bthread_usleep(1000);
}
}
// Stop and join created bthreads.
for (size_t i = 0; i < ncreated; ++i) {
init_args[i].stop = true;
}
for (size_t i = 0; i < ncreated; ++i) {
bthread_join(init_args[i].th, NULL);
}
size_t num_failed_result = 0;
for (size_t i = 0; i < ncreated; ++i) {
if (!init_args[i].result) {
++num_failed_result;
}
}
delete [] init_args;
if (ncreated != _options.bthread_init_count) {
LOG(ERROR) << "Fail to create "
<< _options.bthread_init_count - ncreated << " bthreads";
return -1;
}
if (num_failed_result != 0) {
LOG(ERROR) << num_failed_result << " bthread_init_fn failed";
return -1;
}
}
// Free last SSL contexts
FreeSSLContexts();
if (_options.has_ssl_options()) {
// Change ServerSSLOptions.alpns to _raw_alpns.
// AddCertificate function maybe access raw_alpns variable.
if (InitALPNOptions(_options.mutable_ssl_options()) != 0) {
return -1;
}
CertInfo& default_cert = _options.mutable_ssl_options()->default_cert;
if (default_cert.certificate.empty()) {
LOG(ERROR) << "default_cert is empty";
return -1;
}
if (AddCertificate(default_cert) != 0) {
return -1;
}
_default_ssl_ctx = _ssl_ctx_map.begin()->second.ctx;
const std::vector<CertInfo>& certs = _options.mutable_ssl_options()->certs;
for (size_t i = 0; i < certs.size(); ++i) {
if (AddCertificate(certs[i]) != 0) {
return -1;
}
}
} else if (_options.force_ssl) {
LOG(ERROR) << "Fail to force SSL for all connections "
"without ServerOptions.ssl_options";
return -1;
}
_concurrency = 0;
if (_options.has_builtin_services &&
_builtin_service_count <= 0 &&
AddBuiltinServices() != 0) {
LOG(ERROR) << "Fail to add builtin services";
return -1;
}
// If a server is started/stopped for mutiple times and one of the options
// sets has_builtin_service to true, builtin services will be enabled for
// any later re-start. Check this case and report to user.
if (!_options.has_builtin_services && _builtin_service_count > 0) {
LOG(ERROR) << "A server started/stopped for multiple times must be "
"consistent on ServerOptions.has_builtin_services";
return -1;
}
// Prepare all restful maps
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.restful_map) {
it->second.restful_map->PrepareForFinding();
}
}
if (_global_restful_map) {
_global_restful_map->PrepareForFinding();
}
if (_options.num_threads > 0) {
if (FLAGS_usercode_in_pthread) {
_options.num_threads += FLAGS_usercode_backup_threads;
}
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
it->second.max_concurrency.SetConcurrencyLimiter(cl);
}
}
if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
return -1;
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
return -1;
}
#endif
// Create listening ports
if (port_range.min_port > port_range.max_port) {
LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
<< port_range.max_port << ']';
return -1;
}
if (butil::is_endpoint_extended(endpoint) &&
(port_range.min_port != endpoint.port || port_range.max_port != endpoint.port)) {
LOG(ERROR) << "Only IPv4 address supports port range feature";
return -1;
}
_listen_addr = endpoint;
for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
_listen_addr.port = port;
butil::fd_guard sockfd(tcp_listen(_listen_addr));
if (sockfd < 0) {
if (port != port_range.max_port) { // not the last port, try next
continue;
}
if (port_range.min_port != port_range.max_port) {
LOG(ERROR) << "Fail to listen " << _listen_addr.ip
<< ":[" << port_range.min_port << '-'
<< port_range.max_port << ']';
} else {
LOG(ERROR) << "Fail to listen " << _listen_addr;
}
return -1;
}
if (_listen_addr.port == 0) {
// port=0 makes kernel dynamically select a port from
// https://en.wikipedia.org/wiki/Ephemeral_port
_listen_addr.port = get_port_from_fd(sockfd);
if (_listen_addr.port <= 0) {
LOG(ERROR) << "Fail to get port from fd=" << sockfd;
return -1;
}
}
if (_am == NULL) {
_am = BuildAcceptor();
if (NULL == _am) {
LOG(ERROR) << "Fail to build acceptor";
return -1;
}
_am->_use_rdma = _options.use_rdma;
_am->_bthread_tag = _options.bthread_tag;
}
// Set `_status' to RUNNING before accepting connections
// to prevent requests being rejected as ELOGOFF
_status = RUNNING;
time(&_last_start_time);
GenerateVersionIfNeeded();
g_running_server_count.fetch_add(1, butil::memory_order_relaxed);
// Pass ownership of `sockfd' to `_am'
if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx,
_options.force_ssl) != 0) {
LOG(ERROR) << "Fail to start acceptor";
return -1;
}
sockfd.release();
break; // stop trying
}
if (_options.internal_port >= 0 && _options.has_builtin_services) {
if (_options.internal_port == _listen_addr.port) {
LOG(ERROR) << "ServerOptions.internal_port=" << _options.internal_port
<< " is same with port=" << _listen_addr.port << " to Start()";
return -1;
}
if (_options.internal_port == 0) {
LOG(ERROR) << "ServerOptions.internal_port cannot be 0, which"
" allocates a dynamic and probabaly unfiltered port,"
" against the purpose of \"being internal\".";
return -1;
}
if (butil::is_endpoint_extended(endpoint)) {
LOG(ERROR) << "internal_port is available in IPv4 address only";
return -1;
}
butil::EndPoint internal_point = _listen_addr;
internal_point.port = _options.internal_port;
butil::fd_guard sockfd(tcp_listen(internal_point));
if (sockfd < 0) {
LOG(ERROR) << "Fail to listen " << internal_point << " (internal)";
return -1;
}
if (NULL == _internal_am) {
_internal_am = BuildAcceptor();
if (NULL == _internal_am) {
LOG(ERROR) << "Fail to build internal acceptor";
return -1;
}
}
// Pass ownership of `sockfd' to `_internal_am'
if (_internal_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx,
false) != 0) {
LOG(ERROR) << "Fail to start internal_acceptor";
return -1;
}
sockfd.release();
}
PutPidFileIfNeeded();
// Launch _derivative_thread.
CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.tag = _options.bthread_tag;
if (bthread_start_background(&_derivative_thread, &tmp,
UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread";
return -1;
}
// Print tips to server launcher.
if (butil::is_endpoint_extended(_listen_addr)) {
const char* builtin_msg = _options.has_builtin_services ? " with builtin service" : "";
LOG(INFO) << "Server[" << version() << "] is serving on " << _listen_addr
<< builtin_msg << '.';
//TODO add TrackMe support
} else {
int http_port = _listen_addr.port;
std::ostringstream server_info;
server_info << "Server[" << version() << "] is serving on port="
<< _listen_addr.port;
if (_options.internal_port >= 0 && _options.has_builtin_services) {
http_port = _options.internal_port;
server_info << " and internal_port=" << _options.internal_port;
}
LOG(INFO) << server_info.str() << '.';
if (_options.has_builtin_services) {
LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
<< http_port << " in web browser.";
} else {
LOG(WARNING) << "Builtin services are disabled according to "
"ServerOptions.has_builtin_services";
}
// For trackme reporting
SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
}
revert_server.release();
return 0;
}