in src/replica/replica_stub.cpp [1771:1971]
void replica_stub::on_gc()
{
uint64_t start = dsn_now_ns();
struct gc_info
{
replica_ptr rep;
partition_status::type status;
mutation_log_ptr plog;
decree last_durable_decree;
int64_t init_offset_in_shared_log;
};
std::unordered_map<gpid, gc_info> rs;
{
zauto_read_lock l(_replicas_lock);
// collect info in lock to prevent the case that the replica is closed in replica::close()
for (auto &kv : _replicas) {
const replica_ptr &rep = kv.second;
gc_info &info = rs[kv.first];
info.rep = rep;
info.status = rep->status();
info.plog = rep->private_log();
info.last_durable_decree = rep->last_durable_decree();
info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log;
}
}
LOG_INFO("start to garbage collection, replica_count = {}", rs.size());
// gc shared prepare log
//
// Now that checkpoint is very important for gc, we must be able to trigger checkpoint when
// necessary.
// that is, we should be able to trigger memtable flush when necessary.
//
// How to trigger memtable flush?
// we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true,
// the undering storage system should flush memtable as soon as possiable.
//
// When to trigger memtable flush?
// 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time
// of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint
// will be triggered.
// 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of
// shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead
// of triggering all replicas to do checkpoint, we will only trigger a few of necessary
// replicas which block garbage collection of the oldest log file.
//
if (_log != nullptr) {
replica_log_info_map gc_condition;
for (auto &kv : rs) {
replica_log_info ri;
replica_ptr &rep = kv.second.rep;
mutation_log_ptr &plog = kv.second.plog;
if (plog) {
// flush private log to update plog_max_commit_on_disk,
// and just flush once to avoid flushing infinitely
plog->flush_once();
decree plog_max_commit_on_disk = plog->max_commit_on_disk();
ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk);
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree= {}, plog_max_commit_on_disk = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree,
plog_max_commit_on_disk);
} else {
ri.max_decree = kv.second.last_durable_decree;
LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
"last_durable_decree = {}",
rep->name(),
enum_to_string(kv.second.status),
ri.max_decree,
kv.second.last_durable_decree);
}
ri.valid_start_offset = kv.second.init_offset_in_shared_log;
gc_condition[kv.first] = ri;
}
std::set<gpid> prevent_gc_replicas;
int reserved_log_count = _log->garbage_collection(
gc_condition, FLAGS_log_shared_file_count_limit, prevent_gc_replicas);
if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) {
LOG_INFO(
"gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, "
"file_count_limit = {}, reserved_log_count = {}, trigger all replicas to do "
"checkpoint",
FLAGS_log_shared_file_count_limit,
reserved_log_count);
for (auto &kv : rs) {
tasking::enqueue(
LPC_PER_REPLICA_CHECKPOINT_TIMER,
kv.second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, kv.second.rep, true),
kv.first.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
}
} else if (reserved_log_count > FLAGS_log_shared_file_count_limit) {
std::ostringstream oss;
int c = 0;
for (auto &i : prevent_gc_replicas) {
if (c != 0)
oss << ", ";
oss << i.to_string();
c++;
}
LOG_INFO(
"gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, "
"file_count_limit = {}, reserved_log_count = {}, prevent_gc_replica_count = "
"{}, trigger them to do checkpoint: {}",
FLAGS_log_shared_file_count_limit,
reserved_log_count,
prevent_gc_replicas.size(),
oss.str());
for (auto &id : prevent_gc_replicas) {
auto find = rs.find(id);
if (find != rs.end()) {
tasking::enqueue(
LPC_PER_REPLICA_CHECKPOINT_TIMER,
find->second.rep->tracker(),
std::bind(&replica_stub::trigger_checkpoint, this, find->second.rep, true),
id.thread_hash(),
std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
}
}
}
_counter_shared_log_size->set(_log->total_size() / (1024 * 1024));
}
// statistic learning info
uint64_t learning_count = 0;
uint64_t learning_max_duration_time_ms = 0;
uint64_t learning_max_copy_file_size = 0;
uint64_t cold_backup_running_count = 0;
uint64_t cold_backup_max_duration_time_ms = 0;
uint64_t cold_backup_max_upload_file_size = 0;
uint64_t bulk_load_running_count = 0;
uint64_t bulk_load_max_ingestion_time_ms = 0;
uint64_t bulk_load_max_duration_time_ms = 0;
uint64_t splitting_count = 0;
uint64_t splitting_max_duration_time_ms = 0;
uint64_t splitting_max_async_learn_time_ms = 0;
uint64_t splitting_max_copy_file_size = 0;
for (auto &kv : rs) {
replica_ptr &rep = kv.second.rep;
if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
learning_count++;
learning_max_duration_time_ms = std::max(
learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms());
learning_max_copy_file_size =
std::max(learning_max_copy_file_size,
rep->_potential_secondary_states.learning_copy_file_size);
}
if (rep->status() == partition_status::PS_PRIMARY ||
rep->status() == partition_status::PS_SECONDARY) {
cold_backup_running_count += rep->_cold_backup_running_count.load();
cold_backup_max_duration_time_ms = std::max(
cold_backup_max_duration_time_ms, rep->_cold_backup_max_duration_time_ms.load());
cold_backup_max_upload_file_size = std::max(
cold_backup_max_upload_file_size, rep->_cold_backup_max_upload_file_size.load());
if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) {
bulk_load_running_count++;
bulk_load_max_ingestion_time_ms =
std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms());
bulk_load_max_duration_time_ms =
std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms());
}
}
// splitting_max_copy_file_size, rep->_split_states.copy_file_size
if (rep->status() == partition_status::PS_PARTITION_SPLIT) {
splitting_count++;
splitting_max_duration_time_ms =
std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms());
splitting_max_async_learn_time_ms =
std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms());
splitting_max_copy_file_size =
std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size);
}
}
_counter_replicas_learning_count->set(learning_count);
_counter_replicas_learning_max_duration_time_ms->set(learning_max_duration_time_ms);
_counter_replicas_learning_max_copy_file_size->set(learning_max_copy_file_size);
_counter_cold_backup_running_count->set(cold_backup_running_count);
_counter_cold_backup_max_duration_time_ms->set(cold_backup_max_duration_time_ms);
_counter_cold_backup_max_upload_file_size->set(cold_backup_max_upload_file_size);
_counter_bulk_load_running_count->set(bulk_load_running_count);
_counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms);
_counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms);
_counter_replicas_splitting_count->set(splitting_count);
_counter_replicas_splitting_max_duration_time_ms->set(splitting_max_duration_time_ms);
_counter_replicas_splitting_max_async_learn_time_ms->set(splitting_max_async_learn_time_ms);
_counter_replicas_splitting_max_copy_file_size->set(splitting_max_copy_file_size);
LOG_INFO("finish to garbage collection, time_used_ns = {}", dsn_now_ns() - start);
}