void Server::cron()

in src/server/server.cc [749:857]


void Server::cron() {
  uint64_t counter = 0;
  while (!stop_) {
    // Sleep first
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    // To guarantee accessing DB safely
    auto guard = storage->ReadLockGuard();
    if (storage->IsClosing()) continue;

    updateCachedTime();
    counter++;

    if (is_loading_) {
      // We need to skip the cron operations since `is_loading_` means the db is restoring,
      // and the db pointer will be modified after that. It will panic if we use the db pointer
      // before the new db was reopened.
      continue;
    }

    // check every 20s (use 20s instead of 60s so that cron will execute in critical condition)
    if (counter != 0 && counter % 200 == 0) {
      auto t = static_cast<time_t>(util::GetTimeStamp());
      std::tm now{};
      localtime_r(&t, &now);
      // disable compaction cron when the compaction checker was enabled
      if (!config_->compaction_checker_cron.IsEnabled() && config_->compact_cron.IsEnabled() &&
          config_->compact_cron.IsTimeMatch(&now)) {
        Status s = AsyncCompactDB();
        info("[server] Schedule to compact the db, result: {}", s.Msg());
      }
      if (config_->bgsave_cron.IsEnabled() && config_->bgsave_cron.IsTimeMatch(&now)) {
        Status s = AsyncBgSaveDB();
        info("[server] Schedule to bgsave the db, result: {}", s.Msg());
      }
      if (config_->dbsize_scan_cron.IsEnabled() && config_->dbsize_scan_cron.IsTimeMatch(&now)) {
        auto tokens = namespace_.List();
        std::vector<std::string> namespaces;

        // Number of namespaces (custom namespaces + default one)
        namespaces.reserve(tokens.size() + 1);
        for (auto &token : tokens) {
          namespaces.emplace_back(token.second);  // namespace
        }

        // add default namespace as fallback
        namespaces.emplace_back(kDefaultNamespace);

        for (auto &ns : namespaces) {
          Status s = AsyncScanDBSize(ns);
          info("[server] Schedule to recalculate the db size on namespace: {}, result: {}", ns, s.Msg());
        }
      }
    }
    // check every 10s
    if (counter != 0 && counter % 100 == 0) {
      Status s = AsyncPurgeOldBackups(config_->max_backup_to_keep, config_->max_backup_keep_hours);

      // Purge backup if needed, it will cost much disk space if we keep backup and full sync
      // checkpoints at the same time
      if (config_->purge_backup_on_fullsync && (storage->ExistCheckpoint() || storage->ExistSyncCheckpoint())) {
        s = AsyncPurgeOldBackups(0, 0);
      }
    }

    // No replica uses this checkpoint, we can remove it.
    if (counter != 0 && counter % 100 == 0) {
      int64_t create_time_secs = storage->GetCheckpointCreateTimeSecs();
      int64_t access_time_secs = storage->GetCheckpointAccessTimeSecs();

      if (storage->ExistCheckpoint()) {
        // TODO(shooterit): support to config the alive time of checkpoint
        int64_t now_secs = util::GetTimeStamp<std::chrono::seconds>();
        if ((GetFetchFileThreadNum() == 0 && now_secs - access_time_secs > 30) ||
            (now_secs - create_time_secs > 24 * 60 * 60)) {
          auto s = rocksdb::DestroyDB(config_->checkpoint_dir, rocksdb::Options());
          if (!s.ok()) {
            warn("[server] Fail to clean checkpoint, error: {}", s.ToString());
          } else {
            info("[server] Clean checkpoint successfully");
          }
        }
      }
    }
    // check if DB need to be resumed every minute
    // Rocksdb has auto resume feature after retryable io error, earlier version(before v6.22.1) had
    // bug when encounter no space error. The current version fixes the no space error issue, but it
    // does not completely resolve, which still exists when encountered disk quota exceeded error.
    // In order to properly handle all possible situations on rocksdb, we manually resume here
    // when encountering no space error and disk quota exceeded error.
    if (counter != 0 && counter % 600 == 0 && storage->IsDBInRetryableIOError()) {
      auto s = storage->GetDB()->Resume();
      if (s.ok()) {
        warn("[server] Successfully resumed DB after retryable IO error");
      } else {
        error("[server] Failed to resume DB after retryable IO error: {}", s.ToString());
      }
      storage->SetDBInRetryableIOError(false);
    }

    // check if we need to clean up exited worker threads every 5s
    if (counter != 0 && counter % 50 == 0) {
      cleanupExitedWorkerThreads(false);
    }

    CleanupExitedSlaves();
    recordInstantaneousMetrics();
  }
}