in platform/rdbc/acceptor.cpp [48:89]
void Acceptor::Run() {
LOG(ERROR) << "server:" << config_.GetSelfInfo().id() << " start running";
LockFreeQueue<Socket> socket_queue("server");
std::vector<std::thread> threads;
int woker_num = config_.GetInputWorkerNum();
for (int i = 0; i < woker_num; ++i) {
threads.push_back(std::thread([&]() {
while (IsRunning()) {
auto client_socket = socket_queue.Pop();
if (client_socket == nullptr) {
continue;
}
std::unique_ptr<DataInfo> request_info = std::make_unique<DataInfo>();
int ret =
client_socket->Recv(&request_info->buff, &request_info->data_len);
if (ret <= 0) {
continue;
}
std::unique_ptr<QueueItem> item = std::make_unique<QueueItem>();
item->socket = std::move(client_socket);
item->data = std::move(request_info);
global_stats_->ServerCall();
input_queue_->Push(std::move(item));
}
}));
}
while (IsRunning()) {
auto client_socket = socket_->Accept();
if (client_socket == nullptr) {
continue;
}
socket_queue.Push(std::move(client_socket));
}
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
}