interface/common/resdb_txn_accessor.cpp (142 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include "interface/common/resdb_txn_accessor.h" #include <glog/logging.h> #include <future> #include <thread> namespace resdb { ResDBTxnAccessor::ResDBTxnAccessor(const ResDBConfig& config) : config_(config), replicas_(config.GetReplicaInfos()), recv_timeout_(1) /*1s*/ {} std::unique_ptr<NetChannel> ResDBTxnAccessor::GetNetChannel( const std::string& ip, int port) { return std::make_unique<NetChannel>(ip, port); } // Obtain ReplicaState of each replica. absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t max_seq) { QueryRequest request; request.set_min_seq(min_seq); request.set_max_seq(max_seq); std::vector<std::unique_ptr<NetChannel>> clients; std::vector<std::thread> ths; std::string final_str; std::mutex mtx; std::condition_variable resp_cv; bool success = false; std::map<std::string, int> recv_count; for (const auto& replica : replicas_) { std::unique_ptr<NetChannel> client = GetNetChannel(replica.ip(), replica.port()); NetChannel* client_ptr = client.get(); clients.push_back(std::move(client)); ths.push_back(std::thread( [&](NetChannel* client) { std::string response_str; int ret = client->SendRequest(request, Request::TYPE_QUERY); if (ret) { return; } client->SetRecvTimeout(1000); ret = client->RecvRawMessageStr(&response_str); if (ret == 0) { std::unique_lock<std::mutex> lck(mtx); recv_count[response_str]++; // receive f+1 count. if (recv_count[response_str] == config_.GetMinClientReceiveNum()) { final_str = response_str; success = true; // notify the main thread. resp_cv.notify_all(); } } return; }, client_ptr)); } { std::unique_lock<std::mutex> lck(mtx); resp_cv.wait_for(lck, std::chrono::seconds(recv_timeout_)); // Time out or done, close all the client. for (auto& client : clients) { client->Close(); } } // wait for all theads done. for (auto& th : ths) { if (th.joinable()) { th.join(); } } std::vector<std::pair<uint64_t, std::string>> txn_resp; QueryResponse resp; if (success && final_str.empty()) { return txn_resp; } if (final_str.empty() || !resp.ParseFromString(final_str)) { LOG(ERROR) << "parse fail len:" << final_str.size(); return absl::InternalError("recv data fail."); } for (auto& transaction : resp.transactions()) { txn_resp.push_back(std::make_pair(transaction.seq(), transaction.data())); } return txn_resp; } absl::StatusOr<std::vector<Request>> ResDBTxnAccessor::GetRequestFromReplica( uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica) { QueryRequest request; request.set_min_seq(min_seq); request.set_max_seq(max_seq); std::unique_ptr<NetChannel> client = GetNetChannel(replica.ip(), replica.port()); std::string response_str; int ret = client->SendRequest(request, Request::TYPE_QUERY); if (ret) { return absl::InternalError("send data fail."); } client->SetRecvTimeout(1000); ret = client->RecvRawMessageStr(&response_str); if (ret) { return absl::InternalError("recv data fail."); } QueryResponse resp; if (!resp.ParseFromString(response_str)) { LOG(ERROR) << "parse fail len:" << response_str.size(); return absl::InternalError("recv data fail."); } std::vector<Request> txn_resp; for (auto& transaction : resp.transactions()) { txn_resp.push_back(transaction); } return txn_resp; } absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() { QueryRequest request; request.set_min_seq(0); request.set_max_seq(0); std::vector<std::unique_ptr<NetChannel>> clients; std::vector<std::thread> ths; std::string final_str; std::mutex mtx; std::condition_variable resp_cv; bool success = false; std::unique_ptr<NetChannel> client = GetNetChannel(replicas_[0].ip(), replicas_[0].port()); LOG(INFO) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port(); std::string response_str; int ret = 0; for (int i = 0; i < 5; ++i) { ret = client->SendRequest(request, Request::TYPE_QUERY); if (ret) { continue; } client->SetRecvTimeout(100000); ret = client->RecvRawMessageStr(&response_str); LOG(INFO) << "receive str:" << ret << " len:" << response_str.size(); if (ret != 0) { continue; } break; } QueryResponse resp; if (response_str.empty() || !resp.ParseFromString(response_str)) { LOG(ERROR) << "parse fail len:" << final_str.size(); return absl::InternalError("recv data fail."); } return resp.max_seq(); } } // namespace resdb