platform/networkstrate/service_network.cpp (98 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/networkstrate/service_network.h" #include <glog/logging.h> #include <signal.h> #include <thread> #include "platform/common/network/tcp_socket.h" #include "platform/proto/broadcast.pb.h" namespace resdb { ServiceNetwork::ServiceNetwork(const ResDBConfig& config, std::unique_ptr<ServiceInterface> service) : service_(std::move(service)), input_queue_("input"), resp_queue_("resp"), config_(config) { struct sigaction sa; sa.sa_handler = SIG_IGN; sa.sa_flags = 0; if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGPIPE, &sa, 0) == -1) { perror("failed to ignore SIGPIPE; sigaction"); exit(EXIT_FAILURE); } acceptor_ = std::make_unique<Acceptor>(config, &input_queue_); async_acceptor_ = std::make_unique<AsyncAcceptor>( config.GetSelfInfo().ip(), config_.GetSelfInfo().port() + 10000, config.GetInputWorkerNum(), std::bind(&ServiceNetwork::AcceptorHandler, this, std::placeholders::_1, std::placeholders::_2)); async_acceptor_->StartAccept(); global_stats_ = Stats::GetGlobalStats(); } ServiceNetwork::~ServiceNetwork() {} void ServiceNetwork::AcceptorHandler(const char* buffer, size_t data_len) { BroadcastData data; if (!data.ParseFromArray(buffer, data_len)) { LOG(ERROR) << "parse broad cast fail:" << data_len; return; } for (auto& sub_data : data.data()) { std::unique_ptr<DataInfo> sub_request_info = std::make_unique<DataInfo>(); sub_request_info->data_len = sub_data.size(); sub_request_info->buff = new char[sub_request_info->data_len]; memcpy(sub_request_info->buff, sub_data.data(), sub_request_info->data_len); std::unique_ptr<QueueItem> item = std::make_unique<QueueItem>(); item->socket = nullptr; item->data = std::move(sub_request_info); // LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data // len:"<<item->data->data_len; global_stats_->ServerCall(); input_queue_.Push(std::move(item)); } } void ServiceNetwork::InputProcess() { std::vector<std::thread> threads; int woker_num = config_.GetWorkerNum(); LOG(ERROR) << "server:" << config_.GetSelfInfo().id() << " start running"; for (int i = 0; i < woker_num; ++i) { threads.push_back(std::thread([&]() { while (IsRunning()) { std::unique_ptr<QueueItem> item = input_queue_.Pop(1000); if (item == nullptr) { continue; } global_stats_->ServerProcess(); Process(std::move(item)); } })); } for (auto& th : threads) { if (th.joinable()) { th.join(); } } } void ServiceNetwork::Run() { service_->Start(); auto input_th = std::thread(&ServiceNetwork::InputProcess, this); acceptor_->Run(); input_th.join(); } // Receive a message from network and pass it to service to process. void ServiceNetwork::Process(std::unique_ptr<QueueItem> item) { auto client_socket = item->socket == nullptr ? nullptr : std::move(item->socket); auto request_info = std::move(item->data); if (client_socket != nullptr) { client_socket->SetSendTimeout(1000000); } std::unique_ptr<Context> context = std::make_unique<Context>(); context->client = std::make_unique<NetChannel>(std::move(client_socket), /*connected=*/true); if (request_info) { service_->Process(std::move(context), std::move(request_info)); } return; } bool ServiceNetwork::IsRunning() { return service_->IsRunning(); } void ServiceNetwork::Stop() { acceptor_->Stop(); service_->Stop(); } bool ServiceNetwork::ServiceIsReady() const { return service_->IsReady(); } } // namespace resdb