void DBImpl::PurgeObsoleteFiles()

in db/db_impl/db_impl_files.cc [375:652]


void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
  // we'd better have sth to delete
  assert(state.HaveSomethingToDelete());

  // FindObsoleteFiles() should've populated this so nonzero
  assert(state.manifest_file_number != 0);

  // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
  std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
                                            state.sst_live.end());
  std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(),
                                             state.blob_live.end());
  std::unordered_set<uint64_t> log_recycle_files_set(
      state.log_recycle_files.begin(), state.log_recycle_files.end());

  auto candidate_files = state.full_scan_candidate_files;
  candidate_files.reserve(
      candidate_files.size() + state.sst_delete_files.size() +
      state.blob_delete_files.size() + state.log_delete_files.size() +
      state.manifest_delete_files.size());
  // We may ignore the dbname when generating the file names.
  for (auto& file : state.sst_delete_files) {
    candidate_files.emplace_back(
        MakeTableFileName(file.metadata->fd.GetNumber()), file.path);
    if (file.metadata->table_reader_handle) {
      table_cache_->Release(file.metadata->table_reader_handle);
    }
    file.DeleteMetadata();
  }

  for (const auto& blob_file : state.blob_delete_files) {
    candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()),
                                 blob_file.GetPath());
  }

  auto wal_dir = immutable_db_options_.GetWalDir();
  for (auto file_num : state.log_delete_files) {
    if (file_num > 0) {
      candidate_files.emplace_back(LogFileName(file_num), wal_dir);
    }
  }
  for (const auto& filename : state.manifest_delete_files) {
    candidate_files.emplace_back(filename, dbname_);
  }

  // dedup state.candidate_files so we don't try to delete the same
  // file twice
  std::sort(candidate_files.begin(), candidate_files.end(),
            CompareCandidateFile);
  candidate_files.erase(
      std::unique(candidate_files.begin(), candidate_files.end()),
      candidate_files.end());

  if (state.prev_total_log_size > 0) {
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
                   "[JOB %d] Try to delete WAL files size %" PRIu64
                   ", prev total WAL file size %" PRIu64
                   ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
                   state.job_id, state.size_log_to_delete,
                   state.prev_total_log_size, state.num_alive_log_files);
  }

  std::vector<std::string> old_info_log_files;
  InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
                                dbname_);

  // File numbers of most recent two OPTIONS file in candidate_files (found in
  // previos FindObsoleteFiles(full_scan=true))
  // At this point, there must not be any duplicate file numbers in
  // candidate_files.
  uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
  uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
  for (const auto& candidate_file : candidate_files) {
    const std::string& fname = candidate_file.file_name;
    uint64_t number;
    FileType type;
    if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
        type != kOptionsFile) {
      continue;
    }
    if (number > optsfile_num1) {
      optsfile_num2 = optsfile_num1;
      optsfile_num1 = number;
    } else if (number > optsfile_num2) {
      optsfile_num2 = number;
    }
  }

  // Close WALs before trying to delete them.
  for (const auto w : state.logs_to_free) {
    // TODO: maybe check the return value of Close.
    auto s = w->Close();
    s.PermitUncheckedError();
  }

  bool own_files = OwnTablesAndLogs();
  std::unordered_set<uint64_t> files_to_del;
  for (const auto& candidate_file : candidate_files) {
    const std::string& to_delete = candidate_file.file_name;
    uint64_t number;
    FileType type;
    // Ignore file if we cannot recognize it.
    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
      continue;
    }

    bool keep = true;
    switch (type) {
      case kWalFile:
        keep = ((number >= state.log_number) ||
                (number == state.prev_log_number) ||
                (log_recycle_files_set.find(number) !=
                 log_recycle_files_set.end()));
        break;
      case kDescriptorFile:
        // Keep my manifest file, and any newer incarnations'
        // (can happen during manifest roll)
        keep = (number >= state.manifest_file_number);
        break;
      case kTableFile:
        // If the second condition is not there, this makes
        // DontDeletePendingOutputs fail
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
               number >= state.min_pending_output;
        if (!keep) {
          files_to_del.insert(number);
        }
        break;
      case kBlobFile:
        keep = number >= state.min_pending_output ||
               (blob_live_set.find(number) != blob_live_set.end());
        if (!keep) {
          files_to_del.insert(number);
        }
        break;
      case kTempFile:
        // Any temp files that are currently being written to must
        // be recorded in pending_outputs_, which is inserted into "live".
        // Also, SetCurrentFile creates a temp file when writing out new
        // manifest, which is equal to state.pending_manifest_file_number. We
        // should not delete that file
        //
        // TODO(yhchiang): carefully modify the third condition to safely
        //                 remove the temp options files.
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
               (blob_live_set.find(number) != blob_live_set.end()) ||
               (number == state.pending_manifest_file_number) ||
               (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
        break;
      case kInfoLogFile:
        keep = true;
        if (number != 0) {
          old_info_log_files.push_back(to_delete);
        }
        break;
      case kOptionsFile:
        keep = (number >= optsfile_num2);
        break;
      case kCurrentFile:
      case kDBLockFile:
      case kIdentityFile:
      case kMetaDatabase:
        keep = true;
        break;
    }

    if (keep) {
      continue;
    }

    std::string fname;
    std::string dir_to_sync;
    if (type == kTableFile) {
      // evict from cache
      TableCache::Evict(table_cache_.get(), number);
      fname = MakeTableFileName(candidate_file.file_path, number);
      dir_to_sync = candidate_file.file_path;
    } else if (type == kBlobFile) {
      fname = BlobFileName(candidate_file.file_path, number);
      dir_to_sync = candidate_file.file_path;
    } else {
      dir_to_sync = (type == kWalFile) ? wal_dir : dbname_;
      fname = dir_to_sync +
              ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
                       (!to_delete.empty() && to_delete.front() == '/')
                   ? ""
                   : "/") +
              to_delete;
    }

#ifndef ROCKSDB_LITE
    if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 ||
                             immutable_db_options_.WAL_size_limit_MB > 0)) {
      wal_manager_.ArchiveWALFile(fname, number);
      continue;
    }
#endif  // !ROCKSDB_LITE

    // If I do not own these files, e.g. secondary instance with max_open_files
    // = -1, then no need to delete or schedule delete these files since they
    // will be removed by their owner, e.g. the primary instance.
    if (!own_files) {
      continue;
    }
    if (schedule_only) {
      InstrumentedMutexLock guard_lock(&mutex_);
      SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
    } else {
      DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
    }
  }

  {
    // After purging obsolete files, remove them from files_grabbed_for_purge_.
    InstrumentedMutexLock guard_lock(&mutex_);
    autovector<uint64_t> to_be_removed;
    for (auto fn : files_grabbed_for_purge_) {
      if (files_to_del.count(fn) != 0) {
        to_be_removed.emplace_back(fn);
      }
    }
    for (auto fn : to_be_removed) {
      files_grabbed_for_purge_.erase(fn);
    }
  }

  // Delete old info log files.
  size_t old_info_log_file_count = old_info_log_files.size();
  if (old_info_log_file_count != 0 &&
      old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
    std::sort(old_info_log_files.begin(), old_info_log_files.end());
    size_t end =
        old_info_log_file_count - immutable_db_options_.keep_log_file_num;
    for (unsigned int i = 0; i <= end; i++) {
      std::string& to_delete = old_info_log_files.at(i);
      std::string full_path_to_delete =
          (immutable_db_options_.db_log_dir.empty()
               ? dbname_
               : immutable_db_options_.db_log_dir) +
          "/" + to_delete;
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
                     "[JOB %d] Delete info log file %s\n", state.job_id,
                     full_path_to_delete.c_str());
      Status s = env_->DeleteFile(full_path_to_delete);
      if (!s.ok()) {
        if (env_->FileExists(full_path_to_delete).IsNotFound()) {
          ROCKS_LOG_INFO(
              immutable_db_options_.info_log,
              "[JOB %d] Tried to delete non-existing info log file %s FAILED "
              "-- %s\n",
              state.job_id, to_delete.c_str(), s.ToString().c_str());
        } else {
          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
                          "[JOB %d] Delete info log file %s FAILED -- %s\n",
                          state.job_id, to_delete.c_str(),
                          s.ToString().c_str());
        }
      }
    }
  }
#ifndef ROCKSDB_LITE
  wal_manager_.PurgeObsoleteWALFiles();
#endif  // ROCKSDB_LITE
  LogFlush(immutable_db_options_.info_log);
  InstrumentedMutexLock l(&mutex_);
  --pending_purge_obsolete_files_;
  assert(pending_purge_obsolete_files_ >= 0);
  if (schedule_only) {
    // Must change from pending_purge_obsolete_files_ to bg_purge_scheduled_
    // while holding mutex (for GetSortedWalFiles() etc.)
    SchedulePurge();
  }
  if (pending_purge_obsolete_files_ == 0) {
    bg_cv_.SignalAll();
  }
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
}