in service/http_server/crow_service.cpp [60:390]
void CrowService::run() {
crow::SimpleApp app;
// For adding and removing websocket connections
std::mutex mtx;
// Get all values
CROW_ROUTE(app, "/v1/transactions")
([this](const crow::request &req, response &res) {
uint64_t cur_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
if (cur_time < last_db_scan_time + DB_SCAN_TIMEOUT_MS) {
res.code = 503;
res.set_header("Content-Type", "text/plain");
res.end("Get all transactions functionality on cooldown (" +
std::to_string(last_db_scan_time + DB_SCAN_TIMEOUT_MS - cur_time) +
" ms left)");
} else {
last_db_scan_time = cur_time;
auto values = kv_client_.GetAllValues();
if (values != nullptr) {
//LOG(INFO) << "client getallvalues value = " << values->c_str();
// Send updated blocks list to websocket
if (users.size() > 0) {
for (auto u : users)
u->send_text("Update blocks");
}
num_transactions_++;
res.set_header("Content-Type", "application/json");
res.end(std::string(values->c_str()));
} else {
res.code = 500;
res.set_header("Content-Type", "text/plain");
res.end("getallvalues fail");
}
}
});
// Get value of specific id
CROW_ROUTE(app, "/v1/transactions/<string>")
([this](const crow::request &req, response &res, std::string id) {
auto value = kv_client_.Get(id);
if (value != nullptr) {
LOG(INFO) << "client get value = " << value->c_str();
// Send updated blocks list to websocket
if (users.size() > 0) {
for (auto u : users)
u->send_text("Update blocks");
}
num_transactions_++;
res.set_header("Content-Type", "application/json");
res.end(std::string(value->c_str()));
} else {
res.code = 500;
res.set_header("Content-Type", "text/plain");
res.end("get value fail");
}
});
// Get values based on key range
CROW_ROUTE(app, "/v1/transactions/<string>/<string>")
([this](const crow::request &req, response &res, std::string min_id,
std::string max_id) {
auto value = kv_client_.GetRange(min_id, max_id);
if (value != nullptr) {
LOG(INFO) << "client getrange value = " << value->c_str();
// Send updated blocks list to websocket
if (users.size() > 0) {
for (auto u : users)
u->send_text("Update blocks");
}
num_transactions_++;
res.set_header("Content-Type", "application/json");
res.end(std::string(value->c_str()));
} else {
res.code = 500;
res.set_header("Content-Type", "text/plain");
res.end("getrange fail");
}
});
// Commit a key-value pair, extracting the id parameter from the JSON
// object and setting the value as the entire JSON object
CROW_ROUTE(app, "/v1/transactions/commit")
.methods("POST"_method)([this](const request& req) {
std::string body = req.body;
LOG(INFO) << "body: " << body;
// Parse transaction JSON
rapidjson::Document doc;
doc.Parse(body.c_str());
if (!doc.IsObject() || !doc.HasMember("id")) {
response res(400, "Invalid transaction format"); // Bad Request
res.set_header("Content-Type", "text/plain");
return res;
}
const std::string id = doc["id"].GetString();
const std::string value = body;
// Set key-value pair in kv server
int retval = kv_client_.Set(id, value);
if (retval != 0) {
LOG(ERROR) << "Error when trying to commit id " << id;
response res(500, "id: " + id);
res.set_header("Content-Type", "text/plain");
return res;
}
LOG(INFO) << "Set " << id << " to " << value;
// Send updated blocks list to websocket
if (users.size() > 0) {
for (auto u : users)
u->send_text("Update blocks");
}
num_transactions_++;
response res(201, "id: " + id); // Created status code
res.set_header("Content-Type", "text/plain");
return res;
});
CROW_ROUTE(app, "/v1/blocks")
([this](const crow::request &req, response &res) {
auto values = GetAllBlocks(100);
res.set_header("Content-Type", "application/json");
res.end(values);
});
// Retrieve blocks in batches of size of the int parameter
CROW_ROUTE(app, "/v1/blocks/<int>")
([this](const crow::request &req, response &res, int batch_size) {
auto values = GetAllBlocks(batch_size, false, true);
if (values == "") {
res.code = 500;
res.set_header("Content-Type", "text/plain");
res.end("get replica state fail");
exit(1);
};
res.set_header("Content-Type", "application/json");
res.end(values);
});
// Retrieve blocks within a range
CROW_ROUTE(app, "/v1/blocks/<int>/<int>")
([this](const crow::request &req, response &res, int min_seq, int max_seq) {
auto resp = txn_client_.GetTxn(min_seq, max_seq);
absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
uint64_t min_seq, uint64_t max_seq);
if (!resp.ok()) {
LOG(ERROR) << "get replica state fail";
res.code = 500;
res.set_header("Content-Type", "text/plain");
res.end("get replica state fail");
exit(1);
}
std::string values = "[\n";
bool first_iteration = true;
for (auto &txn : *resp) {
BatchUserRequest request;
KVRequest kv_request;
std::string cur_batch_str = "";
if (request.ParseFromString(txn.second)) {
LOG(INFO) << request.DebugString();
if (!first_iteration)
cur_batch_str.append(",");
first_iteration = false;
// id
uint64_t seq = txn.first;
cur_batch_str.append("{\"id\": " + std::to_string(seq));
// number
cur_batch_str.append(", \"number\": \"" + std::to_string(seq) + "\"");
// transactions
cur_batch_str.append(", \"transactions\": [");
bool first_transaction = true;
for (auto &sub_req : request.user_requests()) {
kv_request.ParseFromString(sub_req.request().data());
std::string kv_request_json = ParseKVRequest(kv_request);
if (!first_transaction)
cur_batch_str.append(",");
first_transaction = false;
cur_batch_str.append(kv_request_json);
cur_batch_str.append("\n");
}
cur_batch_str.append("]"); // close transactions list
// size
cur_batch_str.append(", \"size\": " +
std::to_string(request.ByteSizeLong()));
// createdAt
uint64_t createtime = request.createtime();
cur_batch_str.append(", \"createdAt\": \"" +
ParseCreateTime(createtime) + "\"");
}
cur_batch_str.append("}\n");
values.append(cur_batch_str);
}
values.append("]\n");
res.set_header("Content-Type", "application/json");
res.end(values);
});
CROW_ROUTE(app, "/blockupdatelistener")
.websocket()
.onopen([&](crow::websocket::connection &conn) {
LOG(INFO) << "Opened websocket";
std::lock_guard<std::mutex> _(mtx);
users.insert(&conn);
})
.onclose(
[&](crow::websocket::connection &conn, const std::string &reason) {
LOG(INFO) << "Closed websocket";
std::lock_guard<std::mutex> _(mtx);
users.erase(&conn);
})
.onmessage([&](crow::websocket::connection & /*conn*/,
const std::string &data, bool is_binary) {
// do nothing
});
// For metadata table on the Explorer
CROW_ROUTE(app, "/populatetable")
([this](const crow::request& req, response& res) {
absl::StatusOr<ReplicaState> state_or = state_client_.GetReplicaState();
if(!state_or.ok()){
LOG(ERROR)<<"get state fail";
res.set_header("Content-Type", "application/json");
res.end("");
return;
}
ResConfigData config_data = (*state_or).replica_config();
ResDBConfig server_config(config_data, ReplicaInfo(), KeyInfo(), CertificateInfo());
uint32_t replica_num = server_config.GetReplicaNum();
uint32_t worker_num = server_config.GetWorkerNum();
uint32_t client_batch_num = server_config.ClientBatchNum();
uint32_t max_process_txn = server_config.GetMaxProcessTxn();
uint32_t client_batch_wait_time = server_config.ClientBatchWaitTimeMS();
uint32_t input_worker_num = server_config.GetInputWorkerNum();
uint32_t output_worker_num = server_config.GetOutputWorkerNum();
int client_timeout_ms = server_config.GetClientTimeoutMs();
int min_data_receive_num = server_config.GetMinDataReceiveNum();
size_t max_malicious_replica_num = server_config.GetMaxMaliciousReplicaNum();
int checkpoint_water_mark = server_config.GetCheckPointWaterMark();
// Don't read the ledger if first commit time is known
if (first_commit_time_ == 0) {
// Get first block in the chain
auto resp = txn_client_.GetTxn(1, 1);
// absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn(
// uint64_t min_seq, uint64_t max_seq);
if (!resp.ok()) {
LOG(ERROR) << "get replica state fail";
res.end("get replica state fail");
};
for (auto& txn : *resp) {
BatchUserRequest request;
KVRequest kv_request;
if (request.ParseFromString(txn.second)) {
// transactions
for (auto& sub_req : request.user_requests()) {
kv_request.ParseFromString(sub_req.request().data());
std::string kv_request_json = ParseKVRequest(kv_request);
}
// see resilientdb/common/utils/utils.cpp
first_commit_time_ = request.createtime() / 1000000;
}
}
}
uint64_t epoch_time = std::chrono::duration_cast<std::chrono::seconds>(
std::chrono::system_clock::now().time_since_epoch()
).count();
uint64_t chain_age = first_commit_time_ == 0 ?
0 : epoch_time - first_commit_time_;
auto block_num_resp = txn_client_.GetBlockNumbers();
if (!block_num_resp.ok()) {
LOG(ERROR) << "get number fail";
exit(1);
}
std::string values = "";
values.append("[{ \"replicaNum\": " + std::to_string(replica_num)
+ ", \"workerNum\" : " + std::to_string(worker_num)
+ ", \"clientBatchNum\" : " + std::to_string(client_batch_num)
+ ", \"maxProcessTxn\" : " + std::to_string(max_process_txn)
+ ", \"clientBatchWaitTime\" : " + std::to_string(client_batch_wait_time)
+ ", \"inputWorkerNum\" : " + std::to_string(input_worker_num)
+ ", \"outputWorkerNum\" : " + std::to_string(output_worker_num)
+ ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms)
+ ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num)
+ ", \"maxMaliciousReplicaNum\" : " + std::to_string(max_malicious_replica_num)
+ ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark)
+ ", \"transactionNum\" : " + std::to_string(num_transactions_)
+ ", \"blockNum\" : " + std::to_string(*block_num_resp)
+ ", \"chainAge\" : " + std::to_string(chain_age)
+ "}]");
LOG(INFO) << std::string(values.c_str());
res.set_header("Content-Type", "application/json");
res.end(std::string(values.c_str()));
});
// Run the Crow app
app.port(port_num_).multithreaded().run();
}