chain/storage/leveldb.cpp (271 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "chain/storage/leveldb.h"
#include <glog/logging.h>
#include <unistd.h>
#include <cstdint>
#include "chain/storage/proto/kv.pb.h"
#include "leveldb/options.h"
namespace resdb {
namespace storage {
std::unique_ptr<Storage> NewResLevelDB(const std::string& path,
std::optional<LevelDBInfo> config) {
if (config == std::nullopt) {
config = LevelDBInfo();
}
(*config).set_path(path);
return std::make_unique<ResLevelDB>(config);
}
std::unique_ptr<Storage> NewResLevelDB(std::optional<LevelDBInfo> config) {
return std::make_unique<ResLevelDB>(config);
}
ResLevelDB::ResLevelDB(std::optional<LevelDBInfo> config) {
std::string path = "/tmp/nexres-leveldb";
if (config.has_value()) {
write_buffer_size_ = (*config).write_buffer_size_mb() << 20;
write_batch_size_ = (*config).write_batch_size();
if (!(*config).path().empty()) {
LOG(ERROR) << "Custom path for ResLevelDB provided in config: "
<< (*config).path();
path = (*config).path();
}
}
if ((*config).enable_block_cache()) {
uint32_t capacity = 1000;
if ((*config).has_block_cache_capacity()) {
capacity = (*config).block_cache_capacity();
}
block_cache_ =
std::make_unique<LRUCache<std::string, std::string>>(capacity);
LOG(ERROR) << "initialized block cache" << std::endl;
}
global_stats_ = Stats::GetGlobalStats();
CreateDB(path);
}
void ResLevelDB::CreateDB(const std::string& path) {
LOG(ERROR) << "ResLevelDB Create DB: path:" << path
<< " write buffer size:" << write_buffer_size_
<< " batch size:" << write_batch_size_;
leveldb::Options options;
options.create_if_missing = true;
options.write_buffer_size = write_buffer_size_;
leveldb::DB* db = nullptr;
leveldb::Status status = leveldb::DB::Open(options, path, &db);
if (status.ok()) {
db_ = std::unique_ptr<leveldb::DB>(db);
}
assert(status.ok());
LOG(ERROR) << "Successfully opened LevelDB";
}
ResLevelDB::~ResLevelDB() {
if (db_) {
db_.reset();
}
if (block_cache_) {
block_cache_->Flush();
}
}
int ResLevelDB::SetValue(const std::string& key, const std::string& value) {
if (block_cache_) {
block_cache_->Put(key, value);
}
batch_.Put(key, value);
if (batch_.ApproximateSize() >= write_batch_size_) {
leveldb::Status status = db_->Write(leveldb::WriteOptions(), &batch_);
if (status.ok()) {
batch_.Clear();
UpdateMetrics();
return 0;
} else {
LOG(ERROR) << "flush buffer fail:" << status.ToString();
return -1;
}
}
return 0;
}
std::string ResLevelDB::GetValue(const std::string& key) {
std::string value;
bool found_in_cache = false;
if (block_cache_) {
value = block_cache_->Get(key);
found_in_cache = !value.empty();
}
if (!found_in_cache) {
leveldb::Status status = db_->Get(leveldb::ReadOptions(), key, &value);
if (!status.ok()) {
value.clear(); // Ensure value is empty if not found in DB
}
}
UpdateMetrics();
return value;
}
std::string ResLevelDB::GetAllValues(void) {
std::string values = "[";
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
bool first_iteration = true;
for (it->SeekToFirst(); it->Valid(); it->Next()) {
if (!first_iteration) values.append(",");
first_iteration = false;
values.append(it->value().ToString());
}
values.append("]");
delete it;
return values;
}
std::string ResLevelDB::GetRange(const std::string& min_key,
const std::string& max_key) {
std::string values = "[";
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
bool first_iteration = true;
for (it->Seek(min_key); it->Valid() && it->key().ToString() <= max_key;
it->Next()) {
if (!first_iteration) values.append(",");
first_iteration = false;
values.append(it->value().ToString());
}
values.append("]");
delete it;
return values;
}
bool ResLevelDB::UpdateMetrics() {
if (block_cache_ == nullptr) {
return false;
}
std::string stats;
std::string approximate_size;
db_->GetProperty("leveldb.stats", &stats);
db_->GetProperty("leveldb.approximate-memory-usage", &approximate_size);
global_stats_->SetStorageEngineMetrics(block_cache_->GetCacheHitRatio(),
stats, approximate_size);
return true;
}
bool ResLevelDB::Flush() {
leveldb::Status status = db_->Write(leveldb::WriteOptions(), &batch_);
if (status.ok()) {
batch_.Clear();
return true;
}
LOG(ERROR) << "flush buffer fail:" << status.ToString();
return false;
}
int ResLevelDB::SetValueWithVersion(const std::string& key,
const std::string& value, int version) {
std::string value_str = GetValue(key);
ValueHistory history;
if (!history.ParseFromString(value_str)) {
LOG(ERROR) << "old_value parse fail";
return -2;
}
int last_v = 0;
if (history.value_size() > 0) {
last_v = history.value(history.value_size() - 1).version();
}
if (last_v != version) {
LOG(ERROR) << "version does not match:" << version
<< " old version:" << last_v;
return -2;
}
Value* new_value = history.add_value();
new_value->set_value(value);
new_value->set_version(version + 1);
history.SerializeToString(&value_str);
return SetValue(key, value_str);
}
std::pair<std::string, int> ResLevelDB::GetValueWithVersion(
const std::string& key, int version) {
std::string value_str = GetValue(key);
ValueHistory history;
if (!history.ParseFromString(value_str)) {
LOG(ERROR) << "old_value parse fail";
return std::make_pair("", 0);
}
if (history.value_size() == 0) {
return std::make_pair("", 0);
}
if (version > 0) {
for (int i = history.value_size() - 1; i >= 0; --i) {
if (history.value(i).version() == version) {
return std::make_pair(history.value(i).value(),
history.value(i).version());
}
if (history.value(i).version() < version) {
break;
}
}
}
int last_idx = history.value_size() - 1;
return std::make_pair(history.value(last_idx).value(),
history.value(last_idx).version());
}
// Return a map of <key, <value, version>>
std::map<std::string, std::pair<std::string, int>> ResLevelDB::GetAllItems() {
std::map<std::string, std::pair<std::string, int>> resp;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
ValueHistory history;
if (!history.ParseFromString(it->value().ToString()) ||
history.value_size() == 0) {
LOG(ERROR) << "old_value parse fail";
continue;
}
const Value& value = history.value(history.value_size() - 1);
resp.insert(std::make_pair(it->key().ToString(),
std::make_pair(value.value(), value.version())));
}
delete it;
return resp;
}
std::map<std::string, std::pair<std::string, int>> ResLevelDB::GetKeyRange(
const std::string& min_key, const std::string& max_key) {
std::map<std::string, std::pair<std::string, int>> resp;
leveldb::Iterator* it = db_->NewIterator(leveldb::ReadOptions());
for (it->Seek(min_key); it->Valid() && it->key().ToString() <= max_key;
it->Next()) {
ValueHistory history;
if (!history.ParseFromString(it->value().ToString()) ||
history.value_size() == 0) {
LOG(ERROR) << "old_value parse fail";
continue;
}
const Value& value = history.value(history.value_size() - 1);
resp.insert(std::make_pair(it->key().ToString(),
std::make_pair(value.value(), value.version())));
}
delete it;
return resp;
}
// Return a list of <value, version>
std::vector<std::pair<std::string, int>> ResLevelDB::GetHistory(
const std::string& key, int min_version, int max_version) {
std::vector<std::pair<std::string, int>> resp;
std::string value_str = GetValue(key);
ValueHistory history;
if (!history.ParseFromString(value_str)) {
LOG(ERROR) << "old_value parse fail";
return resp;
}
for (int i = history.value_size() - 1; i >= 0; --i) {
if (history.value(i).version() < min_version) {
break;
}
if (history.value(i).version() <= max_version) {
resp.push_back(
std::make_pair(history.value(i).value(), history.value(i).version()));
}
}
return resp;
}
// Return a list of <value, version>
std::vector<std::pair<std::string, int>> ResLevelDB::GetTopHistory(
const std::string& key, int top_number) {
std::vector<std::pair<std::string, int>> resp;
std::string value_str = GetValue(key);
ValueHistory history;
if (!history.ParseFromString(value_str)) {
LOG(ERROR) << "old_value parse fail";
return resp;
}
for (int i = history.value_size() - 1;
i >= 0 && resp.size() < static_cast<size_t>(top_number); --i) {
resp.push_back(
std::make_pair(history.value(i).value(), history.value(i).version()));
}
return resp;
}
} // namespace storage
} // namespace resdb