int Server::StartInternal()

in src/brpc/server.cpp [831:1256]


int Server::StartInternal(const butil::EndPoint& endpoint,
                          const PortRange& port_range,
                          const ServerOptions *opt) {
    std::unique_ptr<Server, RevertServerStatus> revert_server(this);
    if (_failed_to_set_max_concurrency_of_method) {
        _failed_to_set_max_concurrency_of_method = false;
        LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "
            "fix it before starting server";
        return -1;
    }
    if (_failed_to_set_ignore_eovercrowded) {
        _failed_to_set_ignore_eovercrowded = false;
        LOG(ERROR) << "previous call to IgnoreEovercrowdedOf() was failed, "
            "fix it before starting server";
        return -1;
    }
    if (InitializeOnce() != 0) {
        LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
        return -1;
    }
    const Status st = status();
    if (st != READY) {
        if (st == RUNNING) {
            LOG(ERROR) << "Server[" << version() << "] is already running on "
                       << _listen_addr;
        } else {
            LOG(ERROR) << "Can't start Server[" << version()
                       << "] which is " << status_str(status());
        }
        return -1;
    }

    copy_and_fill_server_options(_options, opt ? *opt : ServerOptions());

    if (!_options.h2_settings.IsValid(true/*log_error*/)) {
        LOG(ERROR) << "Invalid h2_settings";
        return -1;
    }

    if (_options.use_rdma) {
#if BRPC_WITH_RDMA
        if (!OptionsAvailableOverRdma(&_options)) {
            return -1;
        }
        rdma::GlobalRdmaInitializeOrDie();
#else
        LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
        return -1;
#endif
    }

    if (_options.bthread_tag < BTHREAD_TAG_DEFAULT ||
        _options.bthread_tag >= FLAGS_task_group_ntags) {
        LOG(ERROR) << "Fail to set tag " << _options.bthread_tag << ", tag range is ["
                   << BTHREAD_TAG_DEFAULT << ":" << FLAGS_task_group_ntags << ")";
        return -1;
    }

    if (_options.http_master_service) {
        // Check requirements for http_master_service:
        //  has "default_method" & request/response have no fields
        const google::protobuf::ServiceDescriptor* sd =
            _options.http_master_service->GetDescriptor();
        const google::protobuf::MethodDescriptor* md =
            sd->FindMethodByName("default_method");
        if (md == NULL) {
            LOG(ERROR) << "http_master_service must have a method named `default_method'";
            return -1;
        }
        if (md->input_type()->field_count() != 0) {
            LOG(ERROR) << "The request type of http_master_service must have "
                "no fields, actually " << md->input_type()->field_count();
            return -1;
        }
        if (md->output_type()->field_count() != 0) {
            LOG(ERROR) << "The response type of http_master_service must have "
                "no fields, actually " << md->output_type()->field_count();
            return -1;
        }
    }

    // CAUTION:
    //   Following code may run multiple times if this server is started and
    //   stopped more than once. Reuse or delete previous resources!

    if (_options.session_local_data_factory) {
        if (_session_local_data_pool == NULL) {
            _session_local_data_pool =
                new (std::nothrow) SimpleDataPool(_options.session_local_data_factory);
            if (NULL == _session_local_data_pool) {
                LOG(ERROR) << "Fail to new SimpleDataPool";
                return -1;
            }
        } else {
            _session_local_data_pool->Reset(_options.session_local_data_factory);
        }
        _session_local_data_pool->Reserve(_options.reserved_session_local_data);
    }

    // Leak of `_keytable_pool' and others is by design.
    // See comments in Server::Join() for details.
    // Instruct LeakSanitizer to ignore the designated memory leak.
    ANNOTATE_SCOPED_MEMORY_LEAK;
    // Init _keytable_pool always. If the server was stopped before, the pool
    // should be destroyed in Join().
    _keytable_pool = new bthread_keytable_pool_t;
    if (bthread_keytable_pool_init(_keytable_pool) != 0) {
        LOG(ERROR) << "Fail to init _keytable_pool";
        delete _keytable_pool;
        _keytable_pool = NULL;
        return -1;
    }

    if (_options.thread_local_data_factory) {
        _tl_options.thread_local_data_factory = _options.thread_local_data_factory;
        if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
                                _options.thread_local_data_factory) != 0) {
            LOG(ERROR) << "Fail to create thread-local key";
            return -1;
        }
        if (_options.reserved_thread_local_data) {
            bthread_keytable_pool_reserve(_keytable_pool,
                                          _options.reserved_thread_local_data,
                                          _tl_options.tls_key,
                                          CreateServerTLS,
                                          _options.thread_local_data_factory);
        }
    } else {
        _tl_options = ThreadLocalOptions();
    }

    if (_options.bthread_init_count != 0 &&
        _options.bthread_init_fn != NULL) {
        // Create some special bthreads to call the init functions. The
        // bthreads will not quit until all bthreads finish the init function.
        BthreadInitArgs* init_args
            = new BthreadInitArgs[_options.bthread_init_count];
        size_t ncreated = 0;
        for (size_t i = 0; i < _options.bthread_init_count; ++i, ++ncreated) {
            init_args[i].bthread_init_fn = _options.bthread_init_fn;
            init_args[i].bthread_init_args = _options.bthread_init_args;
            init_args[i].result = false;
            init_args[i].done = false;
            init_args[i].stop = false;
            bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
            tmp.tag = _options.bthread_tag;
            tmp.keytable_pool = _keytable_pool;
            if (bthread_start_background(
                    &init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) {
                break;
            }
        }
        // Wait until all created bthreads finish the init function.
        for (size_t i = 0; i < ncreated; ++i) {
            while (!init_args[i].done) {
                bthread_usleep(1000);
            }
        }
        // Stop and join created bthreads.
        for (size_t i = 0; i < ncreated; ++i) {
            init_args[i].stop = true;
        }
        for (size_t i = 0; i < ncreated; ++i) {
            bthread_join(init_args[i].th, NULL);
        }
        size_t num_failed_result = 0;
        for (size_t i = 0; i < ncreated; ++i) {
            if (!init_args[i].result) {
                ++num_failed_result;
            }
        }
        delete [] init_args;
        if (ncreated != _options.bthread_init_count) {
            LOG(ERROR) << "Fail to create "
                       << _options.bthread_init_count - ncreated << " bthreads";
            return -1;
        }
        if (num_failed_result != 0) {
            LOG(ERROR) << num_failed_result << " bthread_init_fn failed";
            return -1;
        }
    }

    // Free last SSL contexts
    FreeSSLContexts();
    if (_options.has_ssl_options()) {

        // Change ServerSSLOptions.alpns to _raw_alpns.
        // AddCertificate function maybe access raw_alpns variable.
        if (InitALPNOptions(_options.mutable_ssl_options()) != 0) {
            return -1;
        }
        CertInfo& default_cert = _options.mutable_ssl_options()->default_cert;
        if (default_cert.certificate.empty()) {
            LOG(ERROR) << "default_cert is empty";
            return -1;
        }
        if (AddCertificate(default_cert) != 0) {
            return -1;
        }
        _default_ssl_ctx = _ssl_ctx_map.begin()->second.ctx;

        const std::vector<CertInfo>& certs = _options.mutable_ssl_options()->certs;
        for (size_t i = 0; i < certs.size(); ++i) {
            if (AddCertificate(certs[i]) != 0) {
                return -1;
            }
        }
    } else if (_options.force_ssl) {
        LOG(ERROR) << "Fail to force SSL for all connections "
                      "without ServerOptions.ssl_options";
        return -1;
    }

    _concurrency = 0;

    if (_options.has_builtin_services &&
        _builtin_service_count <= 0 &&
        AddBuiltinServices() != 0) {
        LOG(ERROR) << "Fail to add builtin services";
        return -1;
    }
    // If a server is started/stopped for mutiple times and one of the options
    // sets has_builtin_service to true, builtin services will be enabled for
    // any later re-start. Check this case and report to user.
    if (!_options.has_builtin_services && _builtin_service_count > 0) {
        LOG(ERROR) << "A server started/stopped for multiple times must be "
            "consistent on ServerOptions.has_builtin_services";
        return -1;
    }

    // Prepare all restful maps
    for (ServiceMap::const_iterator it = _fullname_service_map.begin();
         it != _fullname_service_map.end(); ++it) {
        if (it->second.restful_map) {
            it->second.restful_map->PrepareForFinding();
        }
    }
    if (_global_restful_map) {
        _global_restful_map->PrepareForFinding();
    }

    if (_options.num_threads > 0) {
        if (FLAGS_usercode_in_pthread) {
            _options.num_threads += FLAGS_usercode_backup_threads;
        }
        if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
            _options.num_threads = BTHREAD_MIN_CONCURRENCY;
        }
        bthread_setconcurrency_by_tag(_options.num_threads, _options.bthread_tag);
    }

    for (MethodMap::iterator it = _method_map.begin();
        it != _method_map.end(); ++it) {
        if (it->second.is_builtin_service) {
            it->second.status->SetConcurrencyLimiter(NULL);
        } else {
            const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
            if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED) {
                amc = &_options.method_max_concurrency;
            }
            ConcurrencyLimiter* cl = NULL;
            if (!CreateConcurrencyLimiter(*amc, &cl)) {
                LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
                return -1;
            }
            it->second.status->SetConcurrencyLimiter(cl);
            it->second.max_concurrency.SetConcurrencyLimiter(cl);
        }
    }
    if (0 != SetServiceMaxConcurrency(_options.nshead_service)) {
        return -1;
    }
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
    if (0 != SetServiceMaxConcurrency(_options.thrift_service)) {
        return -1;
    }
#endif


    // Create listening ports
    if (port_range.min_port > port_range.max_port) {
        LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
                   << port_range.max_port << ']';
        return -1;
    }
    if (butil::is_endpoint_extended(endpoint) &&
            (port_range.min_port != endpoint.port || port_range.max_port != endpoint.port)) {
        LOG(ERROR) << "Only IPv4 address supports port range feature";
        return -1;
    }
    _listen_addr = endpoint;
    for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
        _listen_addr.port = port;
        butil::fd_guard sockfd(tcp_listen(_listen_addr));
        if (sockfd < 0) {
            if (port != port_range.max_port) { // not the last port, try next
                continue;
            }
            if (port_range.min_port != port_range.max_port) {
                LOG(ERROR) << "Fail to listen " << _listen_addr.ip
                           << ":[" << port_range.min_port << '-'
                           << port_range.max_port << ']';
            } else {
                LOG(ERROR) << "Fail to listen " << _listen_addr;
            }
            return -1;
        }
        if (_listen_addr.port == 0) {
            // port=0 makes kernel dynamically select a port from
            // https://en.wikipedia.org/wiki/Ephemeral_port
            _listen_addr.port = get_port_from_fd(sockfd);
            if (_listen_addr.port <= 0) {
                LOG(ERROR) << "Fail to get port from fd=" << sockfd;
                return -1;
            }
        }
        if (_am == NULL) {
            _am = BuildAcceptor();
            if (NULL == _am) {
                LOG(ERROR) << "Fail to build acceptor";
                return -1;
            }
            _am->_use_rdma = _options.use_rdma;
            _am->_bthread_tag = _options.bthread_tag;
        }
        // Set `_status' to RUNNING before accepting connections
        // to prevent requests being rejected as ELOGOFF
        _status = RUNNING;
        time(&_last_start_time);
        GenerateVersionIfNeeded();
        g_running_server_count.fetch_add(1, butil::memory_order_relaxed);

        // Pass ownership of `sockfd' to `_am'
        if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
                             _default_ssl_ctx,
                             _options.force_ssl) != 0) {
            LOG(ERROR) << "Fail to start acceptor";
            return -1;
        }
        sockfd.release();
        break; // stop trying
    }
    if (_options.internal_port >= 0 && _options.has_builtin_services) {
        if (_options.internal_port  == _listen_addr.port) {
            LOG(ERROR) << "ServerOptions.internal_port=" << _options.internal_port
                       << " is same with port=" << _listen_addr.port << " to Start()";
            return -1;
        }
        if (_options.internal_port == 0) {
            LOG(ERROR) << "ServerOptions.internal_port cannot be 0, which"
                " allocates a dynamic and probabaly unfiltered port,"
                " against the purpose of \"being internal\".";
            return -1;
        }
        if (butil::is_endpoint_extended(endpoint)) {
            LOG(ERROR) << "internal_port is available in IPv4 address only";
            return -1;
        }

        butil::EndPoint internal_point = _listen_addr;
        internal_point.port = _options.internal_port;
        butil::fd_guard sockfd(tcp_listen(internal_point));
        if (sockfd < 0) {
            LOG(ERROR) << "Fail to listen " << internal_point << " (internal)";
            return -1;
        }
        if (NULL == _internal_am) {
            _internal_am = BuildAcceptor();
            if (NULL == _internal_am) {
                LOG(ERROR) << "Fail to build internal acceptor";
                return -1;
            }
        }
        // Pass ownership of `sockfd' to `_internal_am'
        if (_internal_am->StartAccept(sockfd, _options.idle_timeout_sec,
                                      _default_ssl_ctx,
                                      false) != 0) {
            LOG(ERROR) << "Fail to start internal_acceptor";
            return -1;
        }
        sockfd.release();
    }

    PutPidFileIfNeeded();

    // Launch _derivative_thread.
    CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
    bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
    tmp.tag = _options.bthread_tag;
    if (bthread_start_background(&_derivative_thread, &tmp,
                                 UpdateDerivedVars, this) != 0) {
        LOG(ERROR) << "Fail to create _derivative_thread";
        return -1;
    }

    // Print tips to server launcher.
    if (butil::is_endpoint_extended(_listen_addr)) {
        const char* builtin_msg = _options.has_builtin_services ? " with builtin service" : "";
        LOG(INFO) << "Server[" << version() << "] is serving on " << _listen_addr
                  << builtin_msg << '.';
        //TODO add TrackMe support
    } else {
        int http_port = _listen_addr.port;
        std::ostringstream server_info;
        server_info << "Server[" << version() << "] is serving on port="
                    << _listen_addr.port;
        if (_options.internal_port >= 0 && _options.has_builtin_services) {
            http_port = _options.internal_port;
            server_info << " and internal_port=" << _options.internal_port;
        }
        LOG(INFO) << server_info.str() << '.';

        if (_options.has_builtin_services) {
            LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
                    << http_port << " in web browser.";
        } else {
            LOG(WARNING) << "Builtin services are disabled according to "
                "ServerOptions.has_builtin_services";
        }
        // For trackme reporting
        SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
    }
    revert_server.release();
    return 0;
}