platform/consensus/execution/geo_transaction_executor.cpp (90 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/execution/geo_transaction_executor.h" #include <glog/logging.h> #include "platform/consensus/ordering/common/transaction_utils.h" namespace resdb { GeoTransactionExecutor::GeoTransactionExecutor( const ResDBConfig& config, std::unique_ptr<SystemInfo> system_info, std::unique_ptr<ReplicaCommunicator> replica_communicator, std::unique_ptr<TransactionManager> local_transaction_manager) : TransactionManager(false, false), config_(config), system_info_(std::move(system_info)), replica_communicator_(std::move(replica_communicator)), local_transaction_manager_(std::move(local_transaction_manager)), is_stop_(false) { geo_thread_ = std::thread(&GeoTransactionExecutor::SendGeoMessages, this); } GeoTransactionExecutor::~GeoTransactionExecutor() { is_stop_ = true; if (geo_thread_.joinable()) { geo_thread_.join(); } } bool GeoTransactionExecutor::IsStop() { return is_stop_; } void GeoTransactionExecutor::SendGeoMessages() { std::vector<std::unique_ptr<Request>> messages; while (!IsStop()) { auto message = queue_.Pop(100); if (message != nullptr) { messages.push_back(std::move(message)); while (!IsStop() && messages.size() < batch_size_) { auto message = queue_.Pop(0); if (message == nullptr) { break; } messages.push_back(std::move(message)); if (messages.size() >= batch_size_) { break; } } } if (messages.size() > 0) { SendBatchGeoMessage(messages); messages.clear(); } } return; } void GeoTransactionExecutor::SendBatchGeoMessage( const std::vector<std::unique_ptr<Request>>& batch_geo_request) { ResConfigData config_data = config_.GetConfigData(); int self_send = replica_communicator_->SendBatchMessage( batch_geo_request, config_.GetSelfInfo()); if (self_send < 0) { LOG(ERROR) << "send batch_geo_request to self FAIL!"; } // Only for primary node: send out GEO_REQUEST to other regions. if (config_.GetSelfInfo().id() == system_info_->GetPrimaryId()) { for (const auto& region : config_data.region()) { if (region.region_id() == config_data.self_region_id()) { continue; } // maximum number of faulty replicas in this region int max_faulty = (region.replica_info_size() - 1) / 3; int num_request_sent = 0; for (const auto& replica : region.replica_info()) { // send to f + 1 replicas in the region if (num_request_sent > max_faulty) { break; } int ret = replica_communicator_->SendBatchMessage(batch_geo_request, replica); if (ret >= 0) { num_request_sent++; } } } // LOG(ERROR) << "local executor send out geo message"; } } std::unique_ptr<BatchUserResponse> GeoTransactionExecutor::ExecuteBatch( const BatchUserRequest& request) { std::unique_ptr<Request> geo_request = resdb::NewRequest( Request::TYPE_GEO_REQUEST, Request(), config_.GetSelfInfo().id(), config_.GetConfigData().self_region_id()); geo_request->set_seq(request.seq()); geo_request->set_proxy_id(request.proxy_id()); geo_request->set_hash(SignatureVerifier::CalculateHash( geo_request->data() + std::to_string(request.seq()) + std::to_string(config_.GetConfigData().self_region_id()))); request.SerializeToString(geo_request->mutable_data()); queue_.Push(std::move(geo_request)); return nullptr; } } // namespace resdb