extensions/rocksdb-repos/DatabaseContentRepository.cpp (307 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "DatabaseContentRepository.h"
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <cinttypes>
#include "encryption/RocksDbEncryptionProvider.h"
#include "RocksDbStream.h"
#include "utils/gsl.h"
#include "Exception.h"
#include "database/RocksDbUtils.h"
#include "database/StringAppender.h"
#include "core/Resource.h"
#include "core/TypedValues.h"
namespace org::apache::nifi::minifi::core::repository {
bool DatabaseContentRepository::initialize(const std::shared_ptr<minifi::Configure> &configuration) {
std::string value;
if (configuration->get(Configure::nifi_dbcontent_repository_directory_default, value) && !value.empty()) {
directory_ = value;
} else {
directory_ = (configuration->getHome() / "dbcontentrepository").string();
}
auto purge_period_str = utils::string::trim(configuration->get(Configure::nifi_dbcontent_repository_purge_period).value_or("1 s"));
if (purge_period_str == "0") {
purge_period_ = std::chrono::seconds{0};
} else if (auto purge_period_val = core::TimePeriodValue::fromString(purge_period_str)) {
purge_period_ = purge_period_val->getMilliseconds();
} else {
logger_->log_error("Malformed delete period value, expected time format: '{}'", purge_period_str);
purge_period_ = std::chrono::seconds{1};
}
const auto encrypted_env = createEncryptingEnv(utils::crypto::EncryptionManager{configuration->getHome()}, DbEncryptionOptions{directory_, ENCRYPTION_KEY_NAME});
logger_->log_info("Using {} DatabaseContentRepository", encrypted_env ? "encrypted" : "plaintext");
setCompactionPeriod(configuration);
auto set_db_opts = [encrypted_env] (minifi::internal::Writable<rocksdb::DBOptions>& db_opts) {
minifi::internal::setCommonRocksDbOptions(db_opts);
if (encrypted_env) {
db_opts.set(&rocksdb::DBOptions::env, encrypted_env.get(), EncryptionEq{});
} else {
db_opts.set(&rocksdb::DBOptions::env, rocksdb::Env::Default());
}
};
auto set_cf_opts = [&configuration] (rocksdb::ColumnFamilyOptions& cf_opts) {
cf_opts.OptimizeForPointLookup(4);
cf_opts.merge_operator = std::make_shared<StringAppender>();
cf_opts.max_successive_merges = 0;
if (auto compression_type = minifi::internal::readConfiguredCompressionType(configuration, Configure::nifi_content_repository_rocksdb_compression)) {
cf_opts.compression = *compression_type;
}
};
db_ = minifi::internal::RocksDatabase::create(set_db_opts, set_cf_opts, directory_,
minifi::internal::getRocksDbOptionsToOverride(configuration, Configure::nifi_content_repository_rocksdb_options));
if (db_->open()) {
logger_->log_debug("NiFi Content DB Repository database open {} success", directory_);
is_valid_ = true;
} else {
logger_->log_error("NiFi Content DB Repository database open {} fail", directory_);
is_valid_ = false;
}
use_synchronous_writes_ = configuration->get(Configure::nifi_content_repository_rocksdb_use_synchronous_writes).value_or("true") != "false";
verify_checksums_in_rocksdb_reads_ = (configuration->get(Configure::nifi_content_repository_rocksdb_read_verify_checksums) | utils::andThen(&utils::string::toBool)).value_or(false);
logger_->log_debug("{} checksum verification in DatabaseContentRepository", verify_checksums_in_rocksdb_reads_ ? "Using" : "Not using");
return is_valid_;
}
void DatabaseContentRepository::setCompactionPeriod(const std::shared_ptr<minifi::Configure> &configuration) {
compaction_period_ = DEFAULT_COMPACTION_PERIOD;
if (auto compaction_period_str = configuration->get(Configure::nifi_dbcontent_repository_rocksdb_compaction_period)) {
if (auto compaction_period = TimePeriodValue::fromString(compaction_period_str.value())) {
compaction_period_ = compaction_period->getMilliseconds();
if (compaction_period_.count() == 0) {
logger_->log_warn("Setting '{}' to 0 disables forced compaction", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
}
} else {
logger_->log_error("Malformed property '{}', expected time period, using default", Configure::nifi_dbcontent_repository_rocksdb_compaction_period);
}
} else {
logger_->log_debug("Using default compaction period of {}", compaction_period_);
}
}
void DatabaseContentRepository::runCompaction() {
do {
if (auto opendb = db_->open()) {
auto status = opendb->RunCompaction();
logger_->log_trace("Compaction triggered: {}", status.ToString());
} else {
logger_->log_error("Failed to open database for compaction");
}
} while (!utils::StoppableThread::waitForStopRequest(compaction_period_));
}
void DatabaseContentRepository::start() {
if (!db_ || !is_valid_) {
return;
}
if (compaction_period_.count() != 0) {
compaction_thread_ = std::make_unique<utils::StoppableThread>([this] {
runCompaction();
});
}
if (purge_period_.count() != 0) {
gc_thread_ = std::make_unique<utils::StoppableThread>([this] {
runGc();
});
}
}
void DatabaseContentRepository::stop() {
if (db_) {
auto opendb = db_->open();
if (opendb) {
opendb->FlushWAL(true);
}
compaction_thread_.reset();
gc_thread_.reset();
}
}
DatabaseContentRepository::Session::Session(std::shared_ptr<ContentRepository> repository, bool use_synchronous_writes)
: BufferedContentSession(std::move(repository)),
use_synchronous_writes_(use_synchronous_writes) {}
std::shared_ptr<ContentSession> DatabaseContentRepository::createSession() {
return std::make_shared<Session>(sharedFromThis<ContentRepository>(), use_synchronous_writes_);
}
void DatabaseContentRepository::Session::commit() {
auto dbContentRepository = std::dynamic_pointer_cast<DatabaseContentRepository>(repository_);
auto opendb = dbContentRepository->db_->open();
if (!opendb) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open rocksdb database to commit content changes");
}
auto batch = opendb->createWriteBatch();
for (const auto& resource : managed_resources_) {
auto outStream = dbContentRepository->write(*resource.first, false, &batch);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for write: " + resource.first->getContentFullPath());
}
const auto size = resource.second->size();
if (outStream->write(resource.second->getBuffer()) != size) {
throw Exception(REPOSITORY_EXCEPTION, "Failed to write new resource: " + resource.first->getContentFullPath());
}
}
for (const auto& resource : append_state_) {
auto outStream = dbContentRepository->write(*resource.first, true, &batch);
if (outStream == nullptr) {
throw Exception(REPOSITORY_EXCEPTION, "Couldn't open the underlying resource for append: " + resource.first->getContentFullPath());
}
const auto size = resource.second.stream->size();
if (outStream->write(resource.second.stream->getBuffer()) != size) {
throw Exception(REPOSITORY_EXCEPTION, "Failed to append to resource: " + resource.first->getContentFullPath());
}
}
rocksdb::WriteOptions options;
options.sync = use_synchronous_writes_;
rocksdb::Status status = opendb->Write(options, &batch);
if (!status.ok()) {
throw Exception(REPOSITORY_EXCEPTION, "Batch write failed: " + status.ToString());
}
managed_resources_.clear();
append_state_.clear();
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim &claim, bool append) {
return write(claim, append, nullptr);
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::read(const minifi::ResourceClaim &claim) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (!is_valid_ || !db_)
return nullptr;
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), false, nullptr, true, verify_checksums_in_rocksdb_reads_);
}
bool DatabaseContentRepository::exists(const minifi::ResourceClaim &streamId) {
auto opendb = db_->open();
if (!opendb) {
return false;
}
std::string value;
rocksdb::Status status;
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
status = opendb->Get(options, streamId.getContentFullPath(), &value);
if (status.ok()) {
logger_->log_debug("{} exists", streamId.getContentFullPath());
return true;
} else {
logger_->log_debug("{} does not exist", streamId.getContentFullPath());
return false;
}
}
bool DatabaseContentRepository::removeKeySync(const std::string &content_path) {
if (!is_valid_ || !db_)
return false;
// synchronous deletion
auto opendb = db_->open();
if (!opendb) {
return false;
}
rocksdb::Status status = opendb->Delete(rocksdb::WriteOptions(), content_path);
if (status.ok()) {
logger_->log_debug("Deleting resource {}", content_path);
return true;
} else if (status.IsNotFound()) {
logger_->log_debug("Resource {} was not found", content_path);
return true;
} else {
logger_->log_debug("Attempted, but could not delete {}", content_path);
return false;
}
}
bool DatabaseContentRepository::removeKey(const std::string& content_path) {
if (purge_period_ == std::chrono::seconds(0)) {
return removeKeySync(content_path);
}
// asynchronous deletion
std::lock_guard guard(keys_mtx_);
logger_->log_debug("Staging resource for deletion {}", content_path);
keys_to_delete_.push_back(content_path);
return true;
}
void DatabaseContentRepository::runGc() {
while (!utils::StoppableThread::waitForStopRequest(purge_period_)) {
auto opendb = db_->open();
if (!opendb) {
continue;
}
// keys_to_delete_ is not persisted, in memory only, and is lost on restart
// the clearOrphans method is executed during agent startup making sure that this
// does not cause a content leak
std::vector<std::string> keys;
{
std::lock_guard guard(keys_mtx_);
keys = std::exchange(keys_to_delete_, std::vector<std::string>{});
}
auto batch = opendb->createWriteBatch();
for (auto& key : keys) {
batch.Delete(key);
}
rocksdb::Status status;
status = opendb->Write(rocksdb::WriteOptions(), &batch);
if (status.ok()) {
for (auto& key : keys) {
logger_->log_debug("Deleted resource async {}", key);
}
} else {
for (auto& key : keys) {
logger_->log_debug("Failed to delete resource async {}", key);
}
// move keys we could not delete back to the list for a retry
std::lock_guard guard(keys_mtx_);
keys_to_delete_.insert(keys_to_delete_.end(), keys.begin(), keys.end());
}
}
}
std::shared_ptr<io::BaseStream> DatabaseContentRepository::write(const minifi::ResourceClaim& claim, bool /*append*/, minifi::internal::WriteBatch* batch) {
// the traditional approach with these has been to return -1 from the stream; however, since we have the ability here
// we can simply return a nullptr, which is also valid from the API when this stream is not valid.
if (!is_valid_ || !db_)
return nullptr;
// append is already supported in all modes
return std::make_shared<io::RocksDbStream>(claim.getContentFullPath(), gsl::make_not_null<minifi::internal::RocksDatabase*>(db_.get()), true, batch, true, verify_checksums_in_rocksdb_reads_);
}
void DatabaseContentRepository::clearOrphans() {
if (!is_valid_ || !db_) {
logger_->log_error("Cannot delete orphan content entries, repository is invalid");
return;
}
auto opendb = db_->open();
if (!opendb) {
logger_->log_error("Cannot delete orphan content entries, could not open repository");
return;
}
std::vector<std::string> keys_to_be_deleted;
rocksdb::ReadOptions options;
options.verify_checksums = verify_checksums_in_rocksdb_reads_;
auto it = opendb->NewIterator(options);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
auto key = it->key().ToString();
std::lock_guard<std::mutex> lock(count_map_mutex_);
auto claim_it = count_map_.find(key);
if (claim_it == count_map_.end() || claim_it->second == 0) {
logger_->log_error("Deleting orphan resource {}", key);
keys_to_be_deleted.push_back(key);
}
}
auto batch = opendb->createWriteBatch();
for (auto& key : keys_to_be_deleted) {
batch.Delete(key);
}
rocksdb::Status status = opendb->Write(rocksdb::WriteOptions(), &batch);
if (!status.ok()) {
logger_->log_error("Could not delete orphan contents from rocksdb database: {}", status.ToString());
std::lock_guard<std::mutex> lock(purge_list_mutex_);
for (const auto& key : keys_to_be_deleted) {
purge_list_.push_back(key);
}
}
}
uint64_t DatabaseContentRepository::getRepositorySize() const {
return (utils::optional_from_ptr(db_.get()) |
utils::andThen([](const auto& db) { return db->open(); }) |
utils::andThen([](const auto& opendb) { return opendb.getApproximateSizes(); })).value_or(0);
}
uint64_t DatabaseContentRepository::getRepositoryEntryCount() const {
return (utils::optional_from_ptr(db_.get()) |
utils::andThen([](const auto& db) { return db->open(); }) |
utils::andThen([](auto&& opendb) -> std::optional<uint64_t> {
std::string key_count;
opendb.GetProperty("rocksdb.estimate-num-keys", &key_count);
if (!key_count.empty()) {
return std::stoull(key_count);
}
return std::nullopt;
})).value_or(0);
}
std::optional<RepositoryMetricsSource::RocksDbStats> DatabaseContentRepository::getRocksDbStats() const {
auto opendb = db_->open();
if (!opendb) {
return RocksDbStats{};
}
return opendb->getStats();
}
REGISTER_RESOURCE_AS(DatabaseContentRepository, InternalResource, ("DatabaseContentRepository", "databasecontentrepository"));
} // namespace org::apache::nifi::minifi::core::repository