platform/networkstrate/replica_communicator.cpp (272 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/networkstrate/replica_communicator.h" #include <glog/logging.h> #include <thread> #include "platform/proto/broadcast.pb.h" namespace resdb { ReplicaCommunicator::ReplicaCommunicator( const std::vector<ReplicaInfo>& replicas, SignatureVerifier* verifier, bool is_use_long_conn, int epoll_num, int tcp_batch) : replicas_(replicas), verifier_(verifier), is_running_(false), batch_queue_("bc_batch", tcp_batch), is_use_long_conn_(is_use_long_conn), tcp_batch_(tcp_batch) { global_stats_ = Stats::GetGlobalStats(); if (is_use_long_conn_) { worker_ = std::make_unique<boost::asio::io_service::work>(io_service_); for (int i = 0; i < epoll_num; ++i) { worker_threads_.push_back(std::thread([&]() { io_service_.run(); })); } } LOG(ERROR)<<" tcp batch:"<<tcp_batch; StartBroadcastInBackGround(); } ReplicaCommunicator::~ReplicaCommunicator() { is_running_ = false; if (broadcast_thread_.joinable()) { broadcast_thread_.join(); } if (is_use_long_conn_) { for (auto& cli : client_pools_) { cli.second.reset(); } worker_.reset(); worker_ = nullptr; io_service_.stop(); for (auto& worker_th : worker_threads_) { if (worker_th.joinable()) { worker_th.join(); } } } } bool ReplicaCommunicator::IsInPool(const ReplicaInfo& replica_info) { for (auto& replica : replicas_) { if (replica_info.ip() == replica.ip() && replica_info.port() == replica.port()) { return true; } } return false; } bool ReplicaCommunicator::IsRunning() const { return is_running_; } void ReplicaCommunicator::UpdateClientReplicas( const std::vector<ReplicaInfo>& replicas) { clients_ = replicas; } std::vector<ReplicaInfo> ReplicaCommunicator::GetClientReplicas() { return clients_; } int ReplicaCommunicator::SendHeartBeat(const Request& hb_info) { int ret = 0; for (const auto& replica : replicas_) { NetChannel client(replica.ip(), replica.port()); if (client.SendRawMessage(hb_info) == 0) { ret++; } } return ret; } void ReplicaCommunicator::StartBroadcastInBackGround() { is_running_ = true; broadcast_thread_ = std::thread([&]() { while (IsRunning()) { std::vector<std::unique_ptr<QueueItem>> batch_req = batch_queue_.Pop(10000); if (batch_req.empty()) { continue; } BroadcastData broadcast_data; for (auto& queue_item : batch_req) { broadcast_data.add_data()->swap(queue_item->data); } global_stats_->SendBroadCastMsg(broadcast_data.data_size()); int ret = SendMessageFromPool(broadcast_data, replicas_); if (ret < 0) { LOG(ERROR) << "broadcast request fail:"; } } }); } void ReplicaCommunicator::StartSingleInBackGround(const std::string& ip, int port) { single_bq_[std::make_pair(ip,port)] = std::make_unique<BatchQueue<std::unique_ptr<QueueItem>>>("s_batch", tcp_batch_); ReplicaInfo replica_info; for (const auto& replica : replicas_) { if (replica.ip() == ip && replica.port() == port) { replica_info = replica; break; } } if (replica_info.ip().empty()) { for (const auto& replica : GetClientReplicas()) { if (replica.ip() == ip && replica.port() == port) { replica_info = replica; break; } } } single_thread_.push_back(std::thread([&](BatchQueue<std::unique_ptr<QueueItem>> *bq, ReplicaInfo replica_info) { while (IsRunning()) { std::vector<std::unique_ptr<QueueItem>> batch_req = bq->Pop(50000); if (batch_req.empty()) { continue; } BroadcastData broadcast_data; for (auto& queue_item : batch_req) { broadcast_data.add_data()->swap(queue_item->data); } global_stats_->SendBroadCastMsg(broadcast_data.data_size()); //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size(); int ret = SendMessageFromPool(broadcast_data, {replica_info}); if (ret < 0) { LOG(ERROR) << "broadcast request fail:"; } //LOG(ERROR)<<" send to ip:"<<replica_info.ip()<<" port:"<<replica_info.port()<<" bq size:"<<batch_req.size()<<" done"; } }, single_bq_[std::make_pair(ip,port)].get(), replica_info)); } int ReplicaCommunicator::SendSingleMessage(const google::protobuf::Message& message, const ReplicaInfo& replica_info) { std::string ip = replica_info.ip(); int port = replica_info.port(); //LOG(ERROR)<<" send msg ip:"<<ip<<" port:"<<port; global_stats_->BroadCastMsg(); if (is_use_long_conn_) { auto item = std::make_unique<QueueItem>(); item->data = NetChannel::GetRawMessageString(message, verifier_); std::lock_guard<std::mutex> lk(smutex_); if(single_bq_.find(std::make_pair(ip, port)) == single_bq_.end()){ StartSingleInBackGround(ip, port); } assert(single_bq_[std::make_pair(ip, port)] != nullptr); single_bq_[std::make_pair(ip, port)]->Push(std::move(item)); return 0; } else { return SendMessageInternal(message, replicas_); } } int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message) { global_stats_->BroadCastMsg(); if (is_use_long_conn_) { auto item = std::make_unique<QueueItem>(); item->data = NetChannel::GetRawMessageString(message, verifier_); batch_queue_.Push(std::move(item)); return 0; } else { return SendMessageInternal(message, replicas_); } } int ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, const ReplicaInfo& replica_info) { return SendSingleMessage(message, replica_info); if (is_use_long_conn_) { std::string data = NetChannel::GetRawMessageString(message, verifier_); BroadcastData broadcast_data; broadcast_data.add_data()->swap(data); return SendMessageFromPool(broadcast_data, {replica_info}); } else { return SendMessageInternal(message, {replica_info}); } } int ReplicaCommunicator::SendBatchMessage( const std::vector<std::unique_ptr<Request>>& messages, const ReplicaInfo& replica_info) { if (is_use_long_conn_) { BroadcastData broadcast_data; for (const auto& message : messages) { std::string data = NetChannel::GetRawMessageString(*message, verifier_); broadcast_data.add_data()->swap(data); } return SendMessageFromPool(broadcast_data, {replica_info}); } else { int ret = 0; for (const auto& message : messages) { ret += SendMessageInternal(*message, {replica_info}); } return ret; } } int ReplicaCommunicator::SendMessageFromPool( const google::protobuf::Message& message, const std::vector<ReplicaInfo>& replicas) { int ret = 0; std::string data; message.SerializeToString(&data); global_stats_->SendBroadCastMsgPerRep(); std::lock_guard<std::mutex> lk(mutex_); for (const auto& replica : replicas) { auto client = GetClientFromPool(replica.ip(), replica.port()); if (client == nullptr) { continue; } //LOG(ERROR) << "send to:" << replica.ip(); if (client->SendMessage(data) == 0) { ret++; } else { LOG(ERROR) << "send to:" << replica.ip() << " fail"; } //LOG(ERROR) << "send to:" << replica.ip()<<" done"; } return ret; } int ReplicaCommunicator::SendMessageInternal( const google::protobuf::Message& message, const std::vector<ReplicaInfo>& replicas) { int ret = 0; for (const auto& replica : replicas) { auto client = GetClient(replica.ip(), replica.port()); if (client == nullptr) { continue; } if (verifier_ != nullptr) { client->SetSignatureVerifier(verifier_); } if (client->SendRawMessage(message) == 0) { ret++; } } return ret; } AsyncReplicaClient* ReplicaCommunicator::GetClientFromPool( const std::string& ip, int port) { if (client_pools_.find(std::make_pair(ip, port)) == client_pools_.end()) { auto client = std::make_unique<AsyncReplicaClient>( &io_service_, ip, port + (is_use_long_conn_ ? 10000 : 0), true); client_pools_[std::make_pair(ip, port)] = std::move(client); //StartSingleInBackGround(ip, port); } return client_pools_[std::make_pair(ip, port)].get(); } std::unique_ptr<NetChannel> ReplicaCommunicator::GetClient( const std::string& ip, int port) { return std::make_unique<NetChannel>(ip, port); } void ReplicaCommunicator::BroadCast(const google::protobuf::Message& message) { int ret = SendMessage(message); if (ret < 0) { LOG(ERROR) << "broadcast request fail:"; } } void ReplicaCommunicator::SendMessage(const google::protobuf::Message& message, int64_t node_id) { ReplicaInfo target_replica; for (const auto& replica : replicas_) { if (replica.id() == node_id) { target_replica = replica; break; } } if (target_replica.ip().empty()) { for (const auto& replica : GetClientReplicas()) { if (replica.id() == node_id) { target_replica = replica; break; } } } if (target_replica.ip().empty()) { LOG(ERROR) << "no replica info"; return; } int ret = SendMessage(message, target_replica); if (ret < 0) { LOG(ERROR) << "broadcast request fail:"; } } } // namespace resdb