platform/consensus/ordering/pbft/query.cpp (113 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "platform/consensus/ordering/pbft/query.h"
#include <glog/logging.h>
#include <unistd.h>
namespace resdb {
Query::Query(const ResDBConfig& config, MessageManager* message_manager,
std::unique_ptr<CustomQuery> executor)
: config_(config),
message_manager_(message_manager),
custom_query_executor_(std::move(executor)) {}
Query::~Query() {}
int Query::ProcessGetReplicaState(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
ReplicaState replica_state;
int ret = message_manager_->GetReplicaState(&replica_state);
if (ret == 0) {
if (context != nullptr && context->client != nullptr) {
ret = context->client->SendRawMessage(replica_state);
if (ret) {
LOG(ERROR) << "send resp" << replica_state.DebugString()
<< " fail ret:" << ret;
}
}
}
return ret;
}
int Query::ProcessQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
if (config_.GetPublicKeyCertificateInfo()
.public_key()
.public_key_info()
.type() == CertificateKeyInfo::CLIENT) {
auto find_primary = [&]() {
auto config_data = config_.GetConfigData();
for (const auto& r : config_data.region()) {
for (const auto& replica : r.replica_info()) {
if (replica.id() == 1) {
return replica;
}
}
}
};
ReplicaInfo primary = find_primary();
std::string ip = primary.ip();
int port = primary.port();
LOG(ERROR) << "redirect to primary:" << ip << " port:" << port;
auto client = std::make_unique<NetChannel>(ip, port);
if (client->SendRawMessage(*request) == 0) {
QueryResponse resp;
if (client->RecvRawMessage(&resp) == 0) {
if (context != nullptr && context->client != nullptr) {
LOG(ERROR) << "send response from primary:"
<< resp.transactions_size();
int ret = context->client->SendRawMessage(resp);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
}
}
return 0;
}
QueryRequest query;
if (!query.ParseFromString(request->data())) {
LOG(ERROR) << "parse data fail";
return -2;
}
QueryResponse response;
if (query.max_seq() == 0 && query.min_seq() == 0) {
uint64_t mseq = message_manager_->GetNextSeq();
response.set_max_seq(mseq - 1);
LOG(ERROR) << "get max seq:" << mseq;
} else {
for (uint64_t i = query.min_seq(); i <= query.max_seq(); ++i) {
Request* ret_request = message_manager_->GetRequest(i);
if (ret_request == nullptr) {
break;
}
Request* txn = response.add_transactions();
txn->set_data(ret_request->data());
txn->set_hash(ret_request->hash());
txn->set_seq(ret_request->seq());
txn->set_proxy_id(ret_request->proxy_id());
}
}
if (context != nullptr && context->client != nullptr) {
// LOG(ERROR) << "send response:" << response.DebugString();
int ret = context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
return 0;
}
int Query::ProcessCustomQuery(std::unique_ptr<Context> context,
std::unique_ptr<Request> request) {
if (custom_query_executor_ == nullptr) {
LOG(ERROR) << "no custom executor";
return -1;
}
std::unique_ptr<std::string> resp_str =
custom_query_executor_->Query(request->data());
CustomQueryResponse response;
if (resp_str != nullptr) {
response.set_resp_str(*resp_str);
}
if (context != nullptr && context->client != nullptr) {
int ret = context->client->SendRawMessage(response);
if (ret) {
LOG(ERROR) << "send resp fail ret:" << ret;
}
}
return 0;
}
} // namespace resdb