in extensions/rocksdb-repos/database/RocksDbInstance.cpp [101:225]
std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) {
std::lock_guard<std::mutex> db_guard{mtx_};
if (!impl_) {
gsl_Expects(columns_.empty());
// database needs to be (re)opened
rocksdb::DB* db_instance = nullptr;
rocksdb::Status result;
std::vector<DBOptionsPatch> dbo_patches;
rocksdb::ConfigOptions conf_options;
conf_options.sanity_level = rocksdb::ConfigOptions::kSanityLevelLooselyCompatible;
{
// we have to extract the encryptor environment otherwise
// we won't be able to read the options file
rocksdb::DBOptions dummy_opts;
dummy_opts.env = nullptr; // manually clear it, for the patcher to explicitly set it
for (auto& [col_name, config] : column_configs_) {
if (auto& dbo_patch = config.dbo_patch) {
dbo_patches.push_back(dbo_patch);
Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
dbo_patch(db_options_writer);
if (dummy_opts.env) {
conf_options.env = dummy_opts.env;
}
}
}
// we need to reapply the DBOptions changes to check for conflicts
for (auto& [col_name, config] : column_configs_) {
if (auto& dbo_patch = config.dbo_patch) {
Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
dbo_patch(db_options_writer);
if (db_options_writer.isModified()) {
logger_->log_error("Conflicting database options requested for '{}'", db_name_);
return std::nullopt;
}
}
}
}
db_options_ = rocksdb::DBOptions{};
std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
rocksdb::Status latest_option_status = rocksdb::LoadLatestOptions(conf_options, db_name_, &db_options_, &cf_descriptors);
{
// apply the database options patchers
Writable<rocksdb::DBOptions> db_options_writer(db_options_);
for (auto& [col_name, config] : column_configs_) {
if (auto& dbo_patch = config.dbo_patch) {
dbo_patch(db_options_writer);
}
}
}
// apply requested ColumnFamilyOptions for each already existing ColumnFamily
for (auto& cf_descr : cf_descriptors) {
if (auto it = column_configs_.find(cf_descr.name); it != column_configs_.end()) {
if (auto& cfo_patch = it->second.cfo_patch) {
cfo_patch(cf_descr.options);
}
}
}
auto db_config_override_status = rocksdb::GetDBOptionsFromMap(conf_options, db_options_, db_config_override_, &db_options_);
if (!db_config_override_status.ok()) {
logger_->log_error("Failed to override RocksDB options from minifi.properties file: {}", db_config_override_status.ToString());
return std::nullopt;
}
if (latest_option_status.ok()) {
logger_->log_trace("Found existing database '{}', checking compatibility", db_name_);
rocksdb::Status compat_status = rocksdb::CheckOptionsCompatibility(conf_options, db_name_, db_options_, cf_descriptors);
if (!compat_status.ok()) {
logger_->log_error("Incompatible database options: {}", compat_status.ToString());
return std::nullopt;
}
} else if (latest_option_status.IsNotFound()) {
logger_->log_trace("Database at '{}' not found, creating", db_name_);
rocksdb::ColumnFamilyOptions default_cf_options;
if (auto it = column_configs_.find("default"); it != column_configs_.end()) {
if (auto& cfo_patch = it->second.cfo_patch) {
cfo_patch(default_cf_options);
}
}
cf_descriptors.emplace_back("default", default_cf_options);
} else if (!latest_option_status.ok()) {
logger_->log_error("Couldn't query database '{}' for options: '{}'", db_name_, latest_option_status.ToString());
return std::nullopt;
}
std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
switch (mode_) {
case RocksDbMode::ReadWrite:
result = rocksdb::DB::Open(db_options_, db_name_, cf_descriptors, &column_handles, &db_instance);
if (!result.ok()) {
logger_->log_error("Cannot open writable rocksdb database '{}', error: '{}'", db_name_, result.ToString());
}
break;
case RocksDbMode::ReadOnly:
result = rocksdb::DB::OpenForReadOnly(db_options_, db_name_, cf_descriptors, &column_handles, &db_instance);
if (!result.ok()) {
logger_->log_error("Cannot open read-only rocksdb database '{}', error: '{}'", db_name_, result.ToString());
}
break;
}
if (!result.ok()) {
// we failed to open the database
return std::nullopt;
}
gsl_Expects(db_instance);
// the patches could have internal resources that we need to keep alive
// as long as the database is open (e.g. custom environment)
impl_ = std::make_shared<DbHandle>(std::unique_ptr<rocksdb::DB>(db_instance), std::move(dbo_patches));
for (size_t cf_idx{0}; cf_idx < column_handles.size(); ++cf_idx) {
ColumnFamilyOptionsPatch cfo_patch;
if (auto it = column_configs_.find(column_handles[cf_idx]->GetName()); it != column_configs_.end()) {
cfo_patch = it->second.cfo_patch;
}
columns_[column_handles[cf_idx]->GetName()]
= std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]), cfo_patch);
}
}
std::shared_ptr<ColumnHandle> column_handle = getOrCreateColumnFamily(column, db_guard);
if (!column_handle) {
// error is already logged by the method
return std::nullopt;
}
return OpenRocksDb(
*this,
gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(std::shared_ptr<rocksdb::DB>(impl_, impl_->handle.get())),
gsl::make_not_null<std::shared_ptr<ColumnHandle>>(column_handle));
}