std::optional RocksDbInstance::open()

in extensions/rocksdb-repos/database/RocksDbInstance.cpp [101:225]


std::optional<OpenRocksDb> RocksDbInstance::open(const std::string& column) {
  std::lock_guard<std::mutex> db_guard{mtx_};
  if (!impl_) {
    gsl_Expects(columns_.empty());
    // database needs to be (re)opened
    rocksdb::DB* db_instance = nullptr;
    rocksdb::Status result;

    std::vector<DBOptionsPatch> dbo_patches;
    rocksdb::ConfigOptions conf_options;
    conf_options.sanity_level = rocksdb::ConfigOptions::kSanityLevelLooselyCompatible;
    {
      // we have to extract the encryptor environment otherwise
      // we won't be able to read the options file
      rocksdb::DBOptions dummy_opts;
      dummy_opts.env = nullptr;  // manually clear it, for the patcher to explicitly set it
      for (auto& [col_name, config] : column_configs_) {
        if (auto& dbo_patch = config.dbo_patch) {
          dbo_patches.push_back(dbo_patch);
          Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
          dbo_patch(db_options_writer);
          if (dummy_opts.env) {
            conf_options.env = dummy_opts.env;
          }
        }
      }
      // we need to reapply the DBOptions changes to check for conflicts
      for (auto& [col_name, config] : column_configs_) {
        if (auto& dbo_patch = config.dbo_patch) {
          Writable<rocksdb::DBOptions> db_options_writer(dummy_opts);
          dbo_patch(db_options_writer);
          if (db_options_writer.isModified()) {
            logger_->log_error("Conflicting database options requested for '{}'", db_name_);
            return std::nullopt;
          }
        }
      }
    }
    db_options_ = rocksdb::DBOptions{};
    std::vector<rocksdb::ColumnFamilyDescriptor> cf_descriptors;
    rocksdb::Status latest_option_status = rocksdb::LoadLatestOptions(conf_options, db_name_, &db_options_, &cf_descriptors);
    {
      // apply the database options patchers
      Writable<rocksdb::DBOptions> db_options_writer(db_options_);
      for (auto& [col_name, config] : column_configs_) {
        if (auto& dbo_patch = config.dbo_patch) {
          dbo_patch(db_options_writer);
        }
      }
    }
    // apply requested ColumnFamilyOptions for each already existing ColumnFamily
    for (auto& cf_descr : cf_descriptors) {
      if (auto it = column_configs_.find(cf_descr.name); it != column_configs_.end()) {
        if (auto& cfo_patch = it->second.cfo_patch) {
          cfo_patch(cf_descr.options);
        }
      }
    }
    auto db_config_override_status = rocksdb::GetDBOptionsFromMap(conf_options, db_options_, db_config_override_, &db_options_);
    if (!db_config_override_status.ok()) {
      logger_->log_error("Failed to override RocksDB options from minifi.properties file: {}", db_config_override_status.ToString());
      return std::nullopt;
    }
    if (latest_option_status.ok()) {
      logger_->log_trace("Found existing database '{}', checking compatibility", db_name_);
      rocksdb::Status compat_status = rocksdb::CheckOptionsCompatibility(conf_options, db_name_, db_options_, cf_descriptors);
      if (!compat_status.ok()) {
        logger_->log_error("Incompatible database options: {}", compat_status.ToString());
        return std::nullopt;
      }
    } else if (latest_option_status.IsNotFound()) {
      logger_->log_trace("Database at '{}' not found, creating", db_name_);
      rocksdb::ColumnFamilyOptions default_cf_options;
      if (auto it = column_configs_.find("default"); it != column_configs_.end()) {
        if (auto& cfo_patch = it->second.cfo_patch) {
          cfo_patch(default_cf_options);
        }
      }
      cf_descriptors.emplace_back("default", default_cf_options);
    } else if (!latest_option_status.ok()) {
      logger_->log_error("Couldn't query database '{}' for options: '{}'", db_name_, latest_option_status.ToString());
      return std::nullopt;
    }
    std::vector<rocksdb::ColumnFamilyHandle*> column_handles;
    switch (mode_) {
      case RocksDbMode::ReadWrite:
        result = rocksdb::DB::Open(db_options_, db_name_, cf_descriptors, &column_handles, &db_instance);
        if (!result.ok()) {
          logger_->log_error("Cannot open writable rocksdb database '{}', error: '{}'", db_name_, result.ToString());
        }
        break;
      case RocksDbMode::ReadOnly:
        result = rocksdb::DB::OpenForReadOnly(db_options_, db_name_, cf_descriptors, &column_handles, &db_instance);
        if (!result.ok()) {
          logger_->log_error("Cannot open read-only rocksdb database '{}', error: '{}'", db_name_, result.ToString());
        }
        break;
    }
    if (!result.ok()) {
      // we failed to open the database
      return std::nullopt;
    }
    gsl_Expects(db_instance);
    // the patches could have internal resources that we need to keep alive
    // as long as the database is open (e.g. custom environment)
    impl_ = std::make_shared<DbHandle>(std::unique_ptr<rocksdb::DB>(db_instance), std::move(dbo_patches));
    for (size_t cf_idx{0}; cf_idx < column_handles.size(); ++cf_idx) {
      ColumnFamilyOptionsPatch cfo_patch;
      if (auto it = column_configs_.find(column_handles[cf_idx]->GetName()); it != column_configs_.end()) {
        cfo_patch = it->second.cfo_patch;
      }
      columns_[column_handles[cf_idx]->GetName()]
          = std::make_shared<ColumnHandle>(std::unique_ptr<rocksdb::ColumnFamilyHandle>(column_handles[cf_idx]), cfo_patch);
    }
  }
  std::shared_ptr<ColumnHandle> column_handle = getOrCreateColumnFamily(column, db_guard);
  if (!column_handle) {
    // error is already logged by the method
    return std::nullopt;
  }
  return OpenRocksDb(
      *this,
      gsl::make_not_null<std::shared_ptr<rocksdb::DB>>(std::shared_ptr<rocksdb::DB>(impl_, impl_->handle.get())),
      gsl::make_not_null<std::shared_ptr<ColumnHandle>>(column_handle));
}