void replica_stub::on_gc()

in src/replica/replica_stub.cpp [1771:1971]


void replica_stub::on_gc()
{
    uint64_t start = dsn_now_ns();

    struct gc_info
    {
        replica_ptr rep;
        partition_status::type status;
        mutation_log_ptr plog;
        decree last_durable_decree;
        int64_t init_offset_in_shared_log;
    };

    std::unordered_map<gpid, gc_info> rs;
    {
        zauto_read_lock l(_replicas_lock);
        // collect info in lock to prevent the case that the replica is closed in replica::close()
        for (auto &kv : _replicas) {
            const replica_ptr &rep = kv.second;
            gc_info &info = rs[kv.first];
            info.rep = rep;
            info.status = rep->status();
            info.plog = rep->private_log();
            info.last_durable_decree = rep->last_durable_decree();
            info.init_offset_in_shared_log = rep->get_app()->init_info().init_offset_in_shared_log;
        }
    }

    LOG_INFO("start to garbage collection, replica_count = {}", rs.size());

    // gc shared prepare log
    //
    // Now that checkpoint is very important for gc, we must be able to trigger checkpoint when
    // necessary.
    // that is, we should be able to trigger memtable flush when necessary.
    //
    // How to trigger memtable flush?
    //   we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true,
    //   the undering storage system should flush memtable as soon as possiable.
    //
    // When to trigger memtable flush?
    //   1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time
    //   of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint
    //   will be triggered.
    //   2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of
    //   shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead
    //   of triggering all replicas to do checkpoint, we will only trigger a few of necessary
    //   replicas which block garbage collection of the oldest log file.
    //
    if (_log != nullptr) {
        replica_log_info_map gc_condition;
        for (auto &kv : rs) {
            replica_log_info ri;
            replica_ptr &rep = kv.second.rep;
            mutation_log_ptr &plog = kv.second.plog;
            if (plog) {
                // flush private log to update plog_max_commit_on_disk,
                // and just flush once to avoid flushing infinitely
                plog->flush_once();

                decree plog_max_commit_on_disk = plog->max_commit_on_disk();
                ri.max_decree = std::min(kv.second.last_durable_decree, plog_max_commit_on_disk);
                LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
                         "last_durable_decree= {}, plog_max_commit_on_disk = {}",
                         rep->name(),
                         enum_to_string(kv.second.status),
                         ri.max_decree,
                         kv.second.last_durable_decree,
                         plog_max_commit_on_disk);
            } else {
                ri.max_decree = kv.second.last_durable_decree;
                LOG_INFO("gc_shared: gc condition for {}, status = {}, garbage_max_decree = {}, "
                         "last_durable_decree = {}",
                         rep->name(),
                         enum_to_string(kv.second.status),
                         ri.max_decree,
                         kv.second.last_durable_decree);
            }
            ri.valid_start_offset = kv.second.init_offset_in_shared_log;
            gc_condition[kv.first] = ri;
        }

        std::set<gpid> prevent_gc_replicas;
        int reserved_log_count = _log->garbage_collection(
            gc_condition, FLAGS_log_shared_file_count_limit, prevent_gc_replicas);
        if (reserved_log_count > FLAGS_log_shared_file_count_limit * 2) {
            LOG_INFO(
                "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, "
                "file_count_limit = {}, reserved_log_count = {}, trigger all replicas to do "
                "checkpoint",
                FLAGS_log_shared_file_count_limit,
                reserved_log_count);
            for (auto &kv : rs) {
                tasking::enqueue(
                    LPC_PER_REPLICA_CHECKPOINT_TIMER,
                    kv.second.rep->tracker(),
                    std::bind(&replica_stub::trigger_checkpoint, this, kv.second.rep, true),
                    kv.first.thread_hash(),
                    std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
            }
        } else if (reserved_log_count > FLAGS_log_shared_file_count_limit) {
            std::ostringstream oss;
            int c = 0;
            for (auto &i : prevent_gc_replicas) {
                if (c != 0)
                    oss << ", ";
                oss << i.to_string();
                c++;
            }
            LOG_INFO(
                "gc_shared: trigger emergency checkpoint by FLAGS_log_shared_file_count_limit, "
                "file_count_limit = {}, reserved_log_count = {}, prevent_gc_replica_count = "
                "{}, trigger them to do checkpoint: {}",
                FLAGS_log_shared_file_count_limit,
                reserved_log_count,
                prevent_gc_replicas.size(),
                oss.str());
            for (auto &id : prevent_gc_replicas) {
                auto find = rs.find(id);
                if (find != rs.end()) {
                    tasking::enqueue(
                        LPC_PER_REPLICA_CHECKPOINT_TIMER,
                        find->second.rep->tracker(),
                        std::bind(&replica_stub::trigger_checkpoint, this, find->second.rep, true),
                        id.thread_hash(),
                        std::chrono::milliseconds(rand::next_u32(0, FLAGS_gc_interval_ms / 2)));
                }
            }
        }

        _counter_shared_log_size->set(_log->total_size() / (1024 * 1024));
    }

    // statistic learning info
    uint64_t learning_count = 0;
    uint64_t learning_max_duration_time_ms = 0;
    uint64_t learning_max_copy_file_size = 0;
    uint64_t cold_backup_running_count = 0;
    uint64_t cold_backup_max_duration_time_ms = 0;
    uint64_t cold_backup_max_upload_file_size = 0;
    uint64_t bulk_load_running_count = 0;
    uint64_t bulk_load_max_ingestion_time_ms = 0;
    uint64_t bulk_load_max_duration_time_ms = 0;
    uint64_t splitting_count = 0;
    uint64_t splitting_max_duration_time_ms = 0;
    uint64_t splitting_max_async_learn_time_ms = 0;
    uint64_t splitting_max_copy_file_size = 0;
    for (auto &kv : rs) {
        replica_ptr &rep = kv.second.rep;
        if (rep->status() == partition_status::PS_POTENTIAL_SECONDARY) {
            learning_count++;
            learning_max_duration_time_ms = std::max(
                learning_max_duration_time_ms, rep->_potential_secondary_states.duration_ms());
            learning_max_copy_file_size =
                std::max(learning_max_copy_file_size,
                         rep->_potential_secondary_states.learning_copy_file_size);
        }
        if (rep->status() == partition_status::PS_PRIMARY ||
            rep->status() == partition_status::PS_SECONDARY) {
            cold_backup_running_count += rep->_cold_backup_running_count.load();
            cold_backup_max_duration_time_ms = std::max(
                cold_backup_max_duration_time_ms, rep->_cold_backup_max_duration_time_ms.load());
            cold_backup_max_upload_file_size = std::max(
                cold_backup_max_upload_file_size, rep->_cold_backup_max_upload_file_size.load());

            if (rep->get_bulk_loader()->get_bulk_load_status() != bulk_load_status::BLS_INVALID) {
                bulk_load_running_count++;
                bulk_load_max_ingestion_time_ms =
                    std::max(bulk_load_max_ingestion_time_ms, rep->ingestion_duration_ms());
                bulk_load_max_duration_time_ms =
                    std::max(bulk_load_max_duration_time_ms, rep->get_bulk_loader()->duration_ms());
            }
        }
        // splitting_max_copy_file_size, rep->_split_states.copy_file_size
        if (rep->status() == partition_status::PS_PARTITION_SPLIT) {
            splitting_count++;
            splitting_max_duration_time_ms =
                std::max(splitting_max_duration_time_ms, rep->_split_states.total_ms());
            splitting_max_async_learn_time_ms =
                std::max(splitting_max_async_learn_time_ms, rep->_split_states.async_learn_ms());
            splitting_max_copy_file_size =
                std::max(splitting_max_copy_file_size, rep->_split_states.splitting_copy_file_size);
        }
    }

    _counter_replicas_learning_count->set(learning_count);
    _counter_replicas_learning_max_duration_time_ms->set(learning_max_duration_time_ms);
    _counter_replicas_learning_max_copy_file_size->set(learning_max_copy_file_size);
    _counter_cold_backup_running_count->set(cold_backup_running_count);
    _counter_cold_backup_max_duration_time_ms->set(cold_backup_max_duration_time_ms);
    _counter_cold_backup_max_upload_file_size->set(cold_backup_max_upload_file_size);
    _counter_bulk_load_running_count->set(bulk_load_running_count);
    _counter_bulk_load_max_ingestion_time_ms->set(bulk_load_max_ingestion_time_ms);
    _counter_bulk_load_max_duration_time_ms->set(bulk_load_max_duration_time_ms);
    _counter_replicas_splitting_count->set(splitting_count);
    _counter_replicas_splitting_max_duration_time_ms->set(splitting_max_duration_time_ms);
    _counter_replicas_splitting_max_async_learn_time_ms->set(splitting_max_async_learn_time_ms);
    _counter_replicas_splitting_max_copy_file_size->set(splitting_max_copy_file_size);

    LOG_INFO("finish to garbage collection, time_used_ns = {}", dsn_now_ns() - start);
}