in src/server/server.cc [749:857]
void Server::cron() {
uint64_t counter = 0;
while (!stop_) {
// Sleep first
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// To guarantee accessing DB safely
auto guard = storage->ReadLockGuard();
if (storage->IsClosing()) continue;
updateCachedTime();
counter++;
if (is_loading_) {
// We need to skip the cron operations since `is_loading_` means the db is restoring,
// and the db pointer will be modified after that. It will panic if we use the db pointer
// before the new db was reopened.
continue;
}
// check every 20s (use 20s instead of 60s so that cron will execute in critical condition)
if (counter != 0 && counter % 200 == 0) {
auto t = static_cast<time_t>(util::GetTimeStamp());
std::tm now{};
localtime_r(&t, &now);
// disable compaction cron when the compaction checker was enabled
if (!config_->compaction_checker_cron.IsEnabled() && config_->compact_cron.IsEnabled() &&
config_->compact_cron.IsTimeMatch(&now)) {
Status s = AsyncCompactDB();
info("[server] Schedule to compact the db, result: {}", s.Msg());
}
if (config_->bgsave_cron.IsEnabled() && config_->bgsave_cron.IsTimeMatch(&now)) {
Status s = AsyncBgSaveDB();
info("[server] Schedule to bgsave the db, result: {}", s.Msg());
}
if (config_->dbsize_scan_cron.IsEnabled() && config_->dbsize_scan_cron.IsTimeMatch(&now)) {
auto tokens = namespace_.List();
std::vector<std::string> namespaces;
// Number of namespaces (custom namespaces + default one)
namespaces.reserve(tokens.size() + 1);
for (auto &token : tokens) {
namespaces.emplace_back(token.second); // namespace
}
// add default namespace as fallback
namespaces.emplace_back(kDefaultNamespace);
for (auto &ns : namespaces) {
Status s = AsyncScanDBSize(ns);
info("[server] Schedule to recalculate the db size on namespace: {}, result: {}", ns, s.Msg());
}
}
}
// check every 10s
if (counter != 0 && counter % 100 == 0) {
Status s = AsyncPurgeOldBackups(config_->max_backup_to_keep, config_->max_backup_keep_hours);
// Purge backup if needed, it will cost much disk space if we keep backup and full sync
// checkpoints at the same time
if (config_->purge_backup_on_fullsync && (storage->ExistCheckpoint() || storage->ExistSyncCheckpoint())) {
s = AsyncPurgeOldBackups(0, 0);
}
}
// No replica uses this checkpoint, we can remove it.
if (counter != 0 && counter % 100 == 0) {
int64_t create_time_secs = storage->GetCheckpointCreateTimeSecs();
int64_t access_time_secs = storage->GetCheckpointAccessTimeSecs();
if (storage->ExistCheckpoint()) {
// TODO(shooterit): support to config the alive time of checkpoint
int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
if ((GetFetchFileThreadNum() == 0 && now_secs - access_time_secs > 30) ||
(now_secs - create_time_secs > 24 * 60 * 60)) {
auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
if (!s.ok()) {
warn("[server] Fail to clean checkpoint, error: {}", s.ToString());
} else {
info("[server] Clean checkpoint successfully");
}
}
}
}
// check if DB need to be resumed every minute
// Rocksdb has auto resume feature after retryable io error, earlier version(before v6.22.1) had
// bug when encounter no space error. The current version fixes the no space error issue, but it
// does not completely resolve, which still exists when encountered disk quota exceeded error.
// In order to properly handle all possible situations on rocksdb, we manually resume here
// when encountering no space error and disk quota exceeded error.
if (counter != 0 && counter % 600 == 0 && storage->IsDBInRetryableIOError()) {
auto s = storage->GetDB()->Resume();
if (s.ok()) {
warn("[server] Successfully resumed DB after retryable IO error");
} else {
error("[server] Failed to resume DB after retryable IO error: {}", s.ToString());
}
storage->SetDBInRetryableIOError(false);
}
// check if we need to clean up exited worker threads every 5s
if (counter != 0 && counter % 50 == 0) {
cleanupExitedWorkerThreads(false);
}
CleanupExitedSlaves();
recordInstantaneousMetrics();
}
}