Status Storage::Open()

in src/storage/storage.cc [279:399]


Status Storage::Open(DBOpenMode mode) {
  auto guard = WriteLockGuard();
  db_closing_ = false;

  bool cache_index_and_filter_blocks = config_->rocks_db.cache_index_and_filter_blocks;
  size_t block_cache_size = config_->rocks_db.block_cache_size * MiB;
  size_t metadata_block_cache_size = config_->rocks_db.metadata_block_cache_size * MiB;
  size_t subkey_block_cache_size = config_->rocks_db.subkey_block_cache_size * MiB;
  if (block_cache_size == 0) {
    block_cache_size = metadata_block_cache_size + subkey_block_cache_size;
  }

  rocksdb::Options options = InitRocksDBOptions();
  if (mode == kDBOpenModeDefault) {
    if (auto s = CreateColumnFamilies(options); !s.IsOK()) {
      return s.Prefixed("failed to create column families");
    }
  }

  if (config_->rocks_db.block_cache_type == BlockCacheType::kCacheTypeLRU) {
    shared_block_cache_ = rocksdb::NewLRUCache(block_cache_size, kRocksdbLRUAutoAdjustShardBits,
                                               kRocksdbCacheStrictCapacityLimit, kRocksdbLRUBlockCacheHighPriPoolRatio);
  } else {
    rocksdb::HyperClockCacheOptions hcc_cache_options(block_cache_size, kRockdbHCCAutoAdjustCharge);
    shared_block_cache_ = hcc_cache_options.MakeSharedCache();
  }

  rocksdb::BlockBasedTableOptions metadata_table_opts = InitTableOptions();
  metadata_table_opts.block_cache = shared_block_cache_;
  metadata_table_opts.pin_l0_filter_and_index_blocks_in_cache = true;
  metadata_table_opts.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
  metadata_table_opts.cache_index_and_filter_blocks_with_high_priority = true;

  rocksdb::ColumnFamilyOptions metadata_opts(options);
  metadata_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(metadata_table_opts));
  metadata_opts.compaction_filter_factory = std::make_shared<MetadataFilterFactory>(this);
  metadata_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
  // Enable whole key bloom filter in memtable
  metadata_opts.memtable_whole_key_filtering = true;
  metadata_opts.memtable_prefix_bloom_size_ratio = 0.1;
  metadata_opts.table_properties_collector_factories.emplace_back(
      NewCompactOnExpiredTableCollectorFactory(std::string(kMetadataColumnFamilyName), 0.3));
  SetBlobDB(&metadata_opts);

  rocksdb::BlockBasedTableOptions subkey_table_opts = InitTableOptions();
  subkey_table_opts.block_cache = shared_block_cache_;
  subkey_table_opts.pin_l0_filter_and_index_blocks_in_cache = true;
  subkey_table_opts.cache_index_and_filter_blocks = cache_index_and_filter_blocks;
  subkey_table_opts.cache_index_and_filter_blocks_with_high_priority = true;
  rocksdb::ColumnFamilyOptions subkey_opts(options);
  subkey_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(subkey_table_opts));
  subkey_opts.compaction_filter_factory = std::make_shared<SubKeyFilterFactory>(this);
  subkey_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
  subkey_opts.table_properties_collector_factories.emplace_back(
      NewCompactOnExpiredTableCollectorFactory(std::string(kPrimarySubkeyColumnFamilyName), 0.3));
  SetBlobDB(&subkey_opts);

  rocksdb::BlockBasedTableOptions pubsub_table_opts = InitTableOptions();
  rocksdb::ColumnFamilyOptions pubsub_opts(options);
  pubsub_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(pubsub_table_opts));
  pubsub_opts.compaction_filter_factory = std::make_shared<PubSubFilterFactory>();
  pubsub_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
  SetBlobDB(&pubsub_opts);

  rocksdb::BlockBasedTableOptions propagate_table_opts = InitTableOptions();
  rocksdb::ColumnFamilyOptions propagate_opts(options);
  propagate_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(propagate_table_opts));
  propagate_opts.compaction_filter_factory = std::make_shared<PropagateFilterFactory>();
  propagate_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
  SetBlobDB(&propagate_opts);

  rocksdb::BlockBasedTableOptions search_table_opts = InitTableOptions();
  rocksdb::ColumnFamilyOptions search_opts(options);
  search_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(search_table_opts));
  search_opts.compaction_filter_factory = std::make_shared<SearchFilterFactory>();
  search_opts.disable_auto_compactions = config_->rocks_db.disable_auto_compactions;
  SetBlobDB(&search_opts);

  std::vector<rocksdb::ColumnFamilyDescriptor> column_families;
  // Caution: don't change the order of column family, or the handle will be mismatched
  column_families.emplace_back(rocksdb::kDefaultColumnFamilyName, subkey_opts);
  column_families.emplace_back(std::string(kMetadataColumnFamilyName), metadata_opts);
  column_families.emplace_back(std::string(kSecondarySubkeyColumnFamilyName), subkey_opts);
  column_families.emplace_back(std::string(kPubSubColumnFamilyName), pubsub_opts);
  column_families.emplace_back(std::string(kPropagateColumnFamilyName), propagate_opts);
  column_families.emplace_back(std::string(kStreamColumnFamilyName), subkey_opts);
  column_families.emplace_back(std::string(kSearchColumnFamilyName), search_opts);

  std::vector<std::string> old_column_families;
  auto s = rocksdb::DB::ListColumnFamilies(options, config_->db_dir, &old_column_families);
  if (!s.ok()) return {Status::NotOK, s.ToString()};

  auto start = std::chrono::high_resolution_clock::now();
  switch (mode) {
    case DBOpenMode::kDBOpenModeDefault: {
      db_ = GET_OR_RET(util::DBOpen(options, config_->db_dir, column_families, &cf_handles_));
      break;
    }
    case DBOpenMode::kDBOpenModeForReadOnly: {
      db_ = GET_OR_RET(util::DBOpenForReadOnly(options, config_->db_dir, column_families, &cf_handles_));
      break;
    }
    case DBOpenMode::kDBOpenModeAsSecondaryInstance: {
      db_ = GET_OR_RET(
          util::DBOpenAsSecondaryInstance(options, config_->db_dir, config_->dir, column_families, &cf_handles_));
      break;
    }
    default:
      unreachable();
  }
  auto end = std::chrono::high_resolution_clock::now();
  int64_t duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count();

  if (!db_) {
    info("[storage] Failed to load the data from disk: {} ms", duration);
    return {Status::DBOpenErr};
  }
  info("[storage] Success to load the data from disk: {} ms", duration);

  return Status::OK();
}