in src/replica/mutation_log.cpp [1406:1712]
int mutation_log::garbage_collection(const replica_log_info_map &gc_condition,
int file_count_limit,
std::set<gpid> &prevent_gc_replicas)
{
CHECK(!_is_private, "this method is only valid for shared log");
std::map<int, log_file_ptr> files;
replica_log_info_map max_decrees;
int current_log_index = -1;
int64_t total_log_size = 0;
{
zauto_lock l(_lock);
files = _log_files;
max_decrees = _shared_log_info_map;
if (_current_log_file != nullptr)
current_log_index = _current_log_file->index();
total_log_size = total_size_no_lock();
}
if (files.size() <= 1) {
// nothing to do
LOG_INFO("gc_shared: too few files to delete, file_count_limit = {}, reserved_log_count "
"= {}, reserved_log_size = {}, current_log_index = {}",
file_count_limit,
files.size(),
total_log_size,
current_log_index);
return (int)files.size();
} else {
// the last one should be the current log file
CHECK(-1 == current_log_index || files.rbegin()->first == current_log_index,
"invalid current_log_index, index = {}",
current_log_index);
}
int reserved_log_count = files.size();
int64_t reserved_log_size = total_log_size;
int reserved_smallest_log = files.begin()->first;
int reserved_largest_log = current_log_index;
// find the largest file which can be deleted.
// after iterate, the 'mark_it' will point to the largest file which can be deleted.
std::map<int, log_file_ptr>::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
gpid stop_gc_replica;
int stop_gc_log_index = 0;
decree stop_gc_decree_gap = 0;
decree stop_gc_garbage_max_decree = 0;
decree stop_gc_log_max_decree = 0;
int file_count = 0;
for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
log_file_ptr log = mark_it->second;
CHECK_EQ(mark_it->first, log->index());
file_count++;
bool delete_ok = true;
// skip current file
if (current_log_index == log->index()) {
delete_ok = false;
}
if (delete_ok) {
std::set<gpid> prevent_gc_replicas_for_this_log;
for (auto &kv : gc_condition) {
if (kickout_replicas.find(kv.first) != kickout_replicas.end()) {
// no need to consider this replica
continue;
}
gpid gpid = kv.first;
decree garbage_max_decree = kv.second.max_decree;
int64_t valid_start_offset = kv.second.valid_start_offset;
bool delete_ok_for_this_replica = false;
bool kickout_this_replica = false;
auto it3 = max_decrees.find(gpid);
// log not found for this replica, ok to delete
if (it3 == max_decrees.end()) {
// valid_start_offset may be reset to 0 if initialize_on_load() returns
// ERR_INCOMPLETE_DATA
CHECK(valid_start_offset == 0 || valid_start_offset >= log->end_offset(),
"valid start offset must be 0 or greater than the end of this log file");
LOG_DEBUG(
"gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's "
"safe to delete this and all older logs for this replica",
gpid,
log->path(),
garbage_max_decree);
delete_ok_for_this_replica = true;
kickout_this_replica = true;
}
// log is invalid for this replica, ok to delete
else if (log->end_offset() <= valid_start_offset) {
LOG_DEBUG(
"gc @ {}: log is invalid for {}, as valid start offset vs log end offset = "
"{} vs {}, it is therefore safe to delete this and all older logs for this "
"replica",
gpid,
log->path(),
valid_start_offset,
log->end_offset());
delete_ok_for_this_replica = true;
kickout_this_replica = true;
}
// all decrees are no more than garbage max decree, ok to delete
else if (it3->second.max_decree <= garbage_max_decree) {
LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is "
"therefore safe to delete this and all older logs for this replica",
gpid,
log->path(),
it3->second.max_decree,
garbage_max_decree);
delete_ok_for_this_replica = true;
kickout_this_replica = true;
}
else // it3->second.max_decree > garbage_max_decree
{
// should not delete this file
LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it "
"is therefore not allowed to delete this and all older logs",
gpid,
log->path(),
it3->second.max_decree,
garbage_max_decree);
prevent_gc_replicas_for_this_log.insert(gpid);
decree gap = it3->second.max_decree - garbage_max_decree;
if (log->index() < stop_gc_log_index || gap > stop_gc_decree_gap) {
// record the max gap replica for the smallest log
stop_gc_replica = gpid;
stop_gc_log_index = log->index();
stop_gc_decree_gap = gap;
stop_gc_garbage_max_decree = garbage_max_decree;
stop_gc_log_max_decree = it3->second.max_decree;
}
}
if (kickout_this_replica) {
// files before this file is useless for this replica,
// so from now on, this replica will not be considered anymore
kickout_replicas.insert(gpid);
}
if (!delete_ok_for_this_replica) {
// can not delete this file, mark it, and continue to check other replicas
delete_ok = false;
}
}
// update prevent_gc_replicas
if (file_count > file_count_limit && !prevent_gc_replicas_for_this_log.empty()) {
prevent_gc_replicas.insert(prevent_gc_replicas_for_this_log.begin(),
prevent_gc_replicas_for_this_log.end());
}
}
if (delete_ok) {
// found the largest file which can be deleted
break;
}
// update max_decrees for the next log file
max_decrees = log->previous_log_max_decrees();
}
if (mark_it == files.rend()) {
// no file to delete
if (stop_gc_decree_gap > 0) {
LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, "
"reserved_log_count = {}, reserved_log_size = {}, "
"reserved_smallest_log = {}, reserved_largest_log = {}, "
"stop_gc_log_index = {}, stop_gc_replica_count = {}, "
"stop_gc_replica = {}, stop_gc_decree_gap = {}, "
"stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}",
file_count_limit,
reserved_log_count,
reserved_log_size,
reserved_smallest_log,
reserved_largest_log,
stop_gc_log_index,
prevent_gc_replicas.size(),
stop_gc_replica,
stop_gc_decree_gap,
stop_gc_garbage_max_decree,
stop_gc_log_max_decree);
} else {
LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, "
"reserved_log_count = {}, reserved_log_size = {}, "
"reserved_smallest_log = {}, reserved_largest_log = {}",
file_count_limit,
reserved_log_count,
reserved_log_size,
reserved_smallest_log,
reserved_largest_log);
}
return reserved_log_count;
}
// ok, let's delete files in increasing order of file index
// to avoid making a hole in the file list
int largest_log_to_delete = mark_it->second->index();
int to_delete_log_count = 0;
int64_t to_delete_log_size = 0;
int deleted_log_count = 0;
int64_t deleted_log_size = 0;
int deleted_smallest_log = 0;
int deleted_largest_log = 0;
for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_log_to_delete;
++it) {
log_file_ptr log = it->second;
CHECK_EQ(it->first, log->index());
to_delete_log_count++;
to_delete_log_size += log->end_offset() - log->start_offset();
// close first
log->close();
// delete file
auto &fpath = log->path();
if (!dsn::utils::filesystem::remove_path(fpath)) {
LOG_ERROR("gc_shared: fail to remove {}, stop current gc cycle ...", fpath);
break;
}
// delete succeed
LOG_INFO("gc_shared: log file {} is removed", fpath);
deleted_log_count++;
deleted_log_size += log->end_offset() - log->start_offset();
if (deleted_smallest_log == 0)
deleted_smallest_log = log->index();
deleted_largest_log = log->index();
// erase from _log_files
{
zauto_lock l(_lock);
_log_files.erase(it->first);
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
reserved_log_count = _log_files.size();
reserved_log_size = total_size_no_lock();
if (reserved_log_count > 0) {
reserved_smallest_log = _log_files.begin()->first;
reserved_largest_log = _log_files.rbegin()->first;
} else {
reserved_smallest_log = -1;
reserved_largest_log = -1;
}
}
}
if (stop_gc_decree_gap > 0) {
LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
"reserved_log_count = {}, reserved_log_size = {}, "
"reserved_smallest_log = {}, reserved_largest_log = {}, "
"to_delete_log_count = {}, to_delete_log_size = {}, "
"deleted_log_count = {}, deleted_log_size = {}, "
"deleted_smallest_log = {}, deleted_largest_log = {}, "
"stop_gc_log_index = {}, stop_gc_replica_count = {}, "
"stop_gc_replica = {}, stop_gc_decree_gap = {}, "
"stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}",
file_count_limit,
reserved_log_count,
reserved_log_size,
reserved_smallest_log,
reserved_largest_log,
to_delete_log_count,
to_delete_log_size,
deleted_log_count,
deleted_log_size,
deleted_smallest_log,
deleted_largest_log,
stop_gc_log_index,
prevent_gc_replicas.size(),
stop_gc_replica,
stop_gc_decree_gap,
stop_gc_garbage_max_decree,
stop_gc_log_max_decree);
} else {
LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
"reserved_log_count = {}, reserved_log_size = {}, "
"reserved_smallest_log = {}, reserved_largest_log = {}, "
"to_delete_log_count = {}, to_delete_log_size = {}, "
"deleted_log_count = {}, deleted_log_size = {}, "
"deleted_smallest_log = {}, deleted_largest_log = {}",
file_count_limit,
reserved_log_count,
reserved_log_size,
reserved_smallest_log,
reserved_largest_log,
to_delete_log_count,
to_delete_log_size,
deleted_log_count,
deleted_log_size,
deleted_smallest_log,
deleted_largest_log);
}
return reserved_log_count;
}