platform/consensus/execution/geo_global_executor.cpp (94 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "platform/consensus/execution/geo_global_executor.h" #include <glog/logging.h> namespace resdb { GeoGlobalExecutor::GeoGlobalExecutor( std::unique_ptr<TransactionManager> global_transaction_manager, const ResDBConfig& config) : global_transaction_manager_(std::move(global_transaction_manager)), config_(config), is_stop_(false) { global_stats_ = Stats::GetGlobalStats(); region_size_ = config_.GetConfigData().region().size(); order_thread_ = std::thread(&GeoGlobalExecutor::OrderRound, this); my_region_ = config.GetConfigData().self_region_id(); } GeoGlobalExecutor::~GeoGlobalExecutor() { Stop(); } void GeoGlobalExecutor::Stop() { is_stop_ = true; if (order_thread_.joinable()) { order_thread_.join(); } } void GeoGlobalExecutor::Execute(std::unique_ptr<Request> request) { BatchUserRequest batch_request; if (!batch_request.ParseFromString(request->data())) { LOG(ERROR) << "[GeoGlobalExecutor] parse data fail!"; return; } if (request->data().empty()) { LOG(ERROR) << "[GeoGlobalExecutor] request->data() is empty "; return; } if (global_stats_) { global_stats_->IncTotalGeoRequest(batch_request.user_requests_size()); } if (global_transaction_manager_) { auto batch_response = global_transaction_manager_->ExecuteBatch(batch_request); if (batch_response != nullptr && request->region_info().region_id() == my_region_) { batch_response->set_createtime(batch_request.createtime()); batch_response->set_local_id(batch_request.local_id()); batch_response->set_proxy_id(batch_request.proxy_id()); batch_response->set_seq(batch_request.seq()); resp_queue_.Push(std::move(batch_response)); } } } bool GeoGlobalExecutor::IsStop() { return is_stop_; } int GeoGlobalExecutor::OrderGeoRequest(std::unique_ptr<Request> request) { order_queue_.Push(std::move(request)); return 0; } void GeoGlobalExecutor::AddData() { auto request = order_queue_.Pop(); if (request == nullptr) { return; } global_stats_->IncGeoRequest(); uint64_t seq_num = request->seq(); int region_id = request->region_info().region_id(); execute_map_[std::make_pair(seq_num, region_id)] = std::move(request); } std::unique_ptr<Request> GeoGlobalExecutor::GetNextMap() { if (execute_map_.empty()) { return nullptr; } // LOG(ERROR) << "Next round seq: " << next_seq_ << ", current First in map: " // << execute_map_.begin()->first << ", size: " << // LOG(ERROR)<<"get next seq:"<<next_seq_<<" region:"<<next_region_; if (execute_map_.begin()->first == std::make_pair(next_seq_, next_region_)) { std::unique_ptr<Request> res = std::move(execute_map_.begin()->second); execute_map_.erase(execute_map_.begin()); next_region_++; if (next_region_ > static_cast<int64_t>(region_size_)) { next_region_ = 1; next_seq_++; } return res; } return nullptr; } void GeoGlobalExecutor::OrderRound() { while (!IsStop()) { AddData(); while (!IsStop()) { std::unique_ptr<Request> seq_map = GetNextMap(); if (seq_map == nullptr) { break; } Execute(std::move(seq_map)); } } } std::unique_ptr<BatchUserResponse> GeoGlobalExecutor::GetResponseMsg() { return resp_queue_.Pop(); } } // namespace resdb