in platform/consensus/ordering/pbft/query.cpp [51:122]
int Query::ProcessQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
auto find_primary = [&]() {
auto config_data = config_.GetConfigData();
for (const auto& r : config_data.region()) {
for (const auto& replica : r.replica_info()) {
if (replica.id() == 1) {
return replica;
}
}
}
};
ReplicaInfo primary = find_primary();
std::string ip = primary.ip();
int port = primary.port();
LOG(ERROR) << "redirect to primary:" << ip << " port:" << port;
auto client = std::make_unique<NetChannel>(ip, port);
if (client->SendRawMessage(*request) == 0) {
QueryResponse resp;
if (client->RecvRawMessage(&resp) == 0) {
if (context != nullptr && context->client != nullptr) {
LOG(ERROR) << "send response from primary:"
<< resp.transactions_size();
int ret = context->client->SendRawMessage(resp);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
}
}
return 0;
}
QueryRequest query;
if (!query.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
return -2;
}
QueryResponse response;
if (query.max_seq() == 0 && query.min_seq() == 0) {
uint64_t mseq = message_manager_->GetNextSeq();
response.set_max_seq(mseq - 1);
LOG(ERROR) << "get max seq:" << mseq;
} else {
for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
Request* ret_request = message_manager_->GetRequest(i);
if (ret_request == nullptr) {
break;
}
Request* txn = response.add_transactions();
txn->set_data(ret_request->data());
txn->set_hash(ret_request->hash());
txn->set_seq(ret_request->seq());
txn->set_proxy_id(ret_request->proxy_id());
}
}
if (context != nullptr && context->client != nullptr) {
// LOG(ERROR) << "send response:" << response.DebugString();
int ret = context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
return 0;
}