interface/common/resdb_txn_accessor.cpp (142 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "interface/common/resdb_txn_accessor.h"
#include <glog/logging.h>
#include <future>
#include <thread>
namespace resdb {
ResDBTxnAccessor::ResDBTxnAccessor(const ResDBConfig& config)
: config_(config),
replicas_(config.GetReplicaInfos()),
recv_timeout_(1) /*1s*/ {}
std::unique_ptr<NetChannel> ResDBTxnAccessor::GetNetChannel(
const std::string& ip, int port) {
return std::make_unique<NetChannel>(ip, port);
}
// Obtain ReplicaState of each replica.
absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>>
ResDBTxnAccessor::GetTxn(uint64_t min_seq, uint64_t max_seq) {
QueryRequest request;
request.set_min_seq(min_seq);
request.set_max_seq(max_seq);
std::vector<std::unique_ptr<NetChannel>> clients;
std::vector<std::thread> ths;
std::string final_str;
std::mutex mtx;
std::condition_variable resp_cv;
bool success = false;
std::map<std::string, int> recv_count;
for (const auto& replica : replicas_) {
std::unique_ptr<NetChannel> client =
GetNetChannel(replica.ip(), replica.port());
NetChannel* client_ptr = client.get();
clients.push_back(std::move(client));
ths.push_back(std::thread(
[&](NetChannel* client) {
std::string response_str;
int ret = client->SendRequest(request, Request::TYPE_QUERY);
if (ret) {
return;
}
client->SetRecvTimeout(1000);
ret = client->RecvRawMessageStr(&response_str);
if (ret == 0) {
std::unique_lock<std::mutex> lck(mtx);
recv_count[response_str]++;
// receive f+1 count.
if (recv_count[response_str] == config_.GetMinClientReceiveNum()) {
final_str = response_str;
success = true;
// notify the main thread.
resp_cv.notify_all();
}
}
return;
},
client_ptr));
}
{
std::unique_lock<std::mutex> lck(mtx);
resp_cv.wait_for(lck, std::chrono::seconds(recv_timeout_));
// Time out or done, close all the client.
for (auto& client : clients) {
client->Close();
}
}
// wait for all theads done.
for (auto& th : ths) {
if (th.joinable()) {
th.join();
}
}
std::vector<std::pair<uint64_t, std::string>> txn_resp;
QueryResponse resp;
if (success && final_str.empty()) {
return txn_resp;
}
if (final_str.empty() || !resp.ParseFromString(final_str)) {
LOG(ERROR) << "parse fail len:" << final_str.size();
return absl::InternalError("recv data fail.");
}
for (auto& transaction : resp.transactions()) {
txn_resp.push_back(std::make_pair(transaction.seq(), transaction.data()));
}
return txn_resp;
}
absl::StatusOr<std::vector<Request>> ResDBTxnAccessor::GetRequestFromReplica(
uint64_t min_seq, uint64_t max_seq, const ReplicaInfo& replica) {
QueryRequest request;
request.set_min_seq(min_seq);
request.set_max_seq(max_seq);
std::unique_ptr<NetChannel> client =
GetNetChannel(replica.ip(), replica.port());
std::string response_str;
int ret = client->SendRequest(request, Request::TYPE_QUERY);
if (ret) {
return absl::InternalError("send data fail.");
}
client->SetRecvTimeout(1000);
ret = client->RecvRawMessageStr(&response_str);
if (ret) {
return absl::InternalError("recv data fail.");
}
QueryResponse resp;
if (!resp.ParseFromString(response_str)) {
LOG(ERROR) << "parse fail len:" << response_str.size();
return absl::InternalError("recv data fail.");
}
std::vector<Request> txn_resp;
for (auto& transaction : resp.transactions()) {
txn_resp.push_back(transaction);
}
return txn_resp;
}
absl::StatusOr<uint64_t> ResDBTxnAccessor::GetBlockNumbers() {
QueryRequest request;
request.set_min_seq(0);
request.set_max_seq(0);
std::vector<std::unique_ptr<NetChannel>> clients;
std::vector<std::thread> ths;
std::string final_str;
std::mutex mtx;
std::condition_variable resp_cv;
bool success = false;
std::unique_ptr<NetChannel> client =
GetNetChannel(replicas_[0].ip(), replicas_[0].port());
LOG(INFO) << "ip:" << replicas_[0].ip() << " port:" << replicas_[0].port();
std::string response_str;
int ret = 0;
for (int i = 0; i < 5; ++i) {
ret = client->SendRequest(request, Request::TYPE_QUERY);
if (ret) {
continue;
}
client->SetRecvTimeout(100000);
ret = client->RecvRawMessageStr(&response_str);
LOG(INFO) << "receive str:" << ret << " len:" << response_str.size();
if (ret != 0) {
continue;
}
break;
}
QueryResponse resp;
if (response_str.empty() || !resp.ParseFromString(response_str)) {
LOG(ERROR) << "parse fail len:" << final_str.size();
return absl::InternalError("recv data fail.");
}
return resp.max_seq();
}
} // namespace resdb