in src/server/server.cc [153:245]
Status Server::Start() {
auto s = namespace_.LoadAndRewrite();
if (!s.IsOK()) {
return s;
}
if (!config_->master_host.empty()) {
s = AddMaster(config_->master_host, static_cast<uint32_t>(config_->master_port), false);
if (!s.IsOK()) return s;
} else {
// Generate new replication id if not a replica
engine::Context ctx(storage);
s = storage->ShiftReplId(ctx);
if (!s.IsOK()) {
return s.Prefixed("failed to shift replication id");
}
}
if (!config_->cluster_enabled) {
engine::Context no_txn_ctx = engine::Context::NoTransactionContext(storage);
GET_OR_RET(index_mgr.Load(no_txn_ctx, kDefaultNamespace));
for (const auto &[_, ns] : namespace_.List()) {
GET_OR_RET(index_mgr.Load(no_txn_ctx, ns));
}
}
if (config_->cluster_enabled) {
// Create objects used for slot migration
slot_migrator = std::make_unique<SlotMigrator>(this);
if (config_->persist_cluster_nodes_enabled) {
auto s = cluster->LoadClusterNodes(config_->NodesFilePath());
if (!s.IsOK()) {
return s.Prefixed("failed to load cluster nodes info");
}
}
auto s = slot_migrator->CreateMigrationThread();
if (!s.IsOK()) {
return s.Prefixed("failed to create migration thread");
}
slot_import = std::make_unique<SlotImport>(this);
}
for (const auto &worker : worker_threads_) {
worker->Start();
}
if (auto s = task_runner_.Start(); !s) {
warn("Failed to start task runner: {}", s.Msg());
}
// setup server cron thread
cron_thread_ = GET_OR_RET(util::CreateThread("server-cron", [this] { this->cron(); }));
compaction_checker_thread_ = GET_OR_RET(util::CreateThread("compact-check", [this] {
uint64_t counter = 0;
int64_t last_compact_date = 0;
CompactionChecker compaction_checker{this->storage};
while (!stop_) {
// Sleep first
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// To guarantee accessing DB safely
auto guard = storage->ReadLockGuard();
if (storage->IsClosing()) continue;
if (!is_loading_ && ++counter % 600 == 0 // check every minute
&& config_->compaction_checker_cron.IsEnabled()) {
auto t_now = static_cast<time_t>(util::GetTimeStamp());
std::tm now{};
localtime_r(&t_now, &now);
if (config_->compaction_checker_cron.IsTimeMatch(&now)) {
const auto &column_family_list = engine::ColumnFamilyConfigs::ListAllColumnFamilies();
for (auto &column_family : column_family_list) {
compaction_checker.PickCompactionFilesForCf(column_family);
}
}
// compact once per day
auto now_hours = t_now / 3600;
if (now_hours != 0 && last_compact_date != now_hours / 24) {
last_compact_date = now_hours / 24;
compaction_checker.CompactPropagateAndPubSubFiles();
}
}
}
}));
memory_startup_use_.store(Stats::GetMemoryRSS(), std::memory_order_relaxed);
info("[server] Ready to accept connections");
return Status::OK();
}