Status Server::Start()

in src/server/server.cc [153:245]


Status Server::Start() {
  auto s = namespace_.LoadAndRewrite();
  if (!s.IsOK()) {
    return s;
  }
  if (!config_->master_host.empty()) {
    s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port), false);
    if (!s.IsOK()) return s;
  } else {
    // Generate new replication id if not a replica
    engine::Context ctx(storage);
    s = storage->ShiftReplId(ctx);
    if (!s.IsOK()) {
      return s.Prefixed("failed to shift replication id");
    }
  }

  if (!config_->cluster_enabled) {
    engine::Context no_txn_ctx = engine::Context::NoTransactionContext(storage);
    GET_OR_RET(index_mgr.Load(no_txn_ctx, kDefaultNamespace));
    for (const auto &[_, ns] : namespace_.List()) {
      GET_OR_RET(index_mgr.Load(no_txn_ctx, ns));
    }
  }

  if (config_->cluster_enabled) {
    // Create objects used for slot migration
    slot_migrator = std::make_unique<SlotMigrator>(this);

    if (config_->persist_cluster_nodes_enabled) {
      auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
      if (!s.IsOK()) {
        return s.Prefixed("failed to load cluster nodes info");
      }
    }

    auto s = slot_migrator->CreateMigrationThread();
    if (!s.IsOK()) {
      return s.Prefixed("failed to create migration thread");
    }

    slot_import = std::make_unique<SlotImport>(this);
  }

  for (const auto &worker : worker_threads_) {
    worker->Start();
  }

  if (auto s = task_runner_.Start(); !s) {
    warn("Failed to start task runner: {}", s.Msg());
  }
  // setup server cron thread
  cron_thread_ = GET_OR_RET(util::CreateThread("server-cron", [this] { this->cron(); }));

  compaction_checker_thread_ = GET_OR_RET(util::CreateThread("compact-check", [this] {
    uint64_t counter = 0;
    int64_t last_compact_date = 0;
    CompactionChecker compaction_checker{this->storage};

    while (!stop_) {
      // Sleep first
      std::this_thread::sleep_for(std::chrono::milliseconds(100));

      // To guarantee accessing DB safely
      auto guard = storage->ReadLockGuard();
      if (storage->IsClosing()) continue;

      if (!is_loading_ && ++counter % 600 == 0  // check every minute
          && config_->compaction_checker_cron.IsEnabled()) {
        auto t_now = static_cast<time_t>(util::GetTimeStamp());
        std::tm now{};
        localtime_r(&t_now, &now);
        if (config_->compaction_checker_cron.IsTimeMatch(&now)) {
          const auto &column_family_list = engine::ColumnFamilyConfigs::ListAllColumnFamilies();
          for (auto &column_family : column_family_list) {
            compaction_checker.PickCompactionFilesForCf(column_family);
          }
        }
        // compact once per day
        auto now_hours = t_now / 3600;
        if (now_hours != 0 && last_compact_date != now_hours / 24) {
          last_compact_date = now_hours / 24;
          compaction_checker.CompactPropagateAndPubSubFiles();
        }
      }
    }
  }));

  memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
  info("[server] Ready to accept connections");

  return Status::OK();
}