platform/networkstrate/service_network.cpp (98 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/networkstrate/service_network.h"
#include <glog/logging.h>
#include <signal.h>
#include <thread>
#include "platform/common/network/tcp_socket.h"
#include "platform/proto/broadcast.pb.h"
namespace resdb {
ServiceNetwork::ServiceNetwork(const ResDBConfig& config,
std::unique_ptr<ServiceInterface> service)
: service_(std::move(service)),
input_queue_("input"),
resp_queue_("resp"),
config_(config) {
struct sigaction sa;
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
if (sigemptyset(&sa.sa_mask) == -1 || sigaction(SIGPIPE, &sa, 0) == -1) {
perror("failed to ignore SIGPIPE; sigaction");
exit(EXIT_FAILURE);
}
acceptor_ = std::make_unique<Acceptor>(config, &input_queue_);
async_acceptor_ = std::make_unique<AsyncAcceptor>(
config.GetSelfInfo().ip(), config_.GetSelfInfo().port() + 10000,
config.GetInputWorkerNum(),
std::bind(&ServiceNetwork::AcceptorHandler, this, std::placeholders::_1,
std::placeholders::_2));
async_acceptor_->StartAccept();
global_stats_ = Stats::GetGlobalStats();
}
ServiceNetwork::~ServiceNetwork() {}
void ServiceNetwork::AcceptorHandler(const char* buffer, size_t data_len) {
BroadcastData data;
if (!data.ParseFromArray(buffer, data_len)) {
LOG(ERROR) << "parse broad cast fail:" << data_len;
return;
}
for (auto& sub_data : data.data()) {
std::unique_ptr<DataInfo> sub_request_info = std::make_unique<DataInfo>();
sub_request_info->data_len = sub_data.size();
sub_request_info->buff = new char[sub_request_info->data_len];
memcpy(sub_request_info->buff, sub_data.data(), sub_request_info->data_len);
std::unique_ptr<QueueItem> item = std::make_unique<QueueItem>();
item->socket = nullptr;
item->data = std::move(sub_request_info);
// LOG(ERROR) << "receve data from acceptor:" << data.is_resp()<<" data
// len:"<<item->data->data_len;
global_stats_->ServerCall();
input_queue_.Push(std::move(item));
}
}
void ServiceNetwork::InputProcess() {
std::vector<std::thread> threads;
int woker_num = config_.GetWorkerNum();
LOG(ERROR) << "server:" << config_.GetSelfInfo().id() << " start running";
for (int i = 0; i < woker_num; ++i) {
threads.push_back(std::thread([&]() {
while (IsRunning()) {
std::unique_ptr<QueueItem> item = input_queue_.Pop(1000);
if (item == nullptr) {
continue;
}
global_stats_->ServerProcess();
Process(std::move(item));
}
}));
}
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
}
void ServiceNetwork::Run() {
service_->Start();
auto input_th = std::thread(&ServiceNetwork::InputProcess, this);
acceptor_->Run();
input_th.join();
}
// Receive a message from network and pass it to service to process.
void ServiceNetwork::Process(std::unique_ptr<QueueItem> item) {
auto client_socket =
item->socket == nullptr ? nullptr : std::move(item->socket);
auto request_info = std::move(item->data);
if (client_socket != nullptr) {
client_socket->SetSendTimeout(1000000);
}
std::unique_ptr<Context> context = std::make_unique<Context>();
context->client = std::make_unique<NetChannel>(std::move(client_socket),
/*connected=*/true);
if (request_info) {
service_->Process(std::move(context), std::move(request_info));
}
return;
}
bool ServiceNetwork::IsRunning() { return service_->IsRunning(); }
void ServiceNetwork::Stop() {
acceptor_->Stop();
service_->Stop();
}
bool ServiceNetwork::ServiceIsReady() const { return service_->IsReady(); }
} // namespace resdb