int mutation_log::garbage_collection()

in src/replica/mutation_log.cpp [1406:1712]


int mutation_log::garbage_collection(const replica_log_info_map &gc_condition,
                                     int file_count_limit,
                                     std::set<gpid> &prevent_gc_replicas)
{
    CHECK(!_is_private, "this method is only valid for shared log");

    std::map<int, log_file_ptr> files;
    replica_log_info_map max_decrees;
    int current_log_index = -1;
    int64_t total_log_size = 0;

    {
        zauto_lock l(_lock);
        files = _log_files;
        max_decrees = _shared_log_info_map;
        if (_current_log_file != nullptr)
            current_log_index = _current_log_file->index();
        total_log_size = total_size_no_lock();
    }

    if (files.size() <= 1) {
        // nothing to do
        LOG_INFO("gc_shared: too few files to delete, file_count_limit = {}, reserved_log_count "
                 "= {}, reserved_log_size = {}, current_log_index = {}",
                 file_count_limit,
                 files.size(),
                 total_log_size,
                 current_log_index);
        return (int)files.size();
    } else {
        // the last one should be the current log file
        CHECK(-1 == current_log_index || files.rbegin()->first == current_log_index,
              "invalid current_log_index, index = {}",
              current_log_index);
    }

    int reserved_log_count = files.size();
    int64_t reserved_log_size = total_log_size;
    int reserved_smallest_log = files.begin()->first;
    int reserved_largest_log = current_log_index;

    // find the largest file which can be deleted.
    // after iterate, the 'mark_it' will point to the largest file which can be deleted.
    std::map<int, log_file_ptr>::reverse_iterator mark_it;
    std::set<gpid> kickout_replicas;
    gpid stop_gc_replica;
    int stop_gc_log_index = 0;
    decree stop_gc_decree_gap = 0;
    decree stop_gc_garbage_max_decree = 0;
    decree stop_gc_log_max_decree = 0;
    int file_count = 0;
    for (mark_it = files.rbegin(); mark_it != files.rend(); ++mark_it) {
        log_file_ptr log = mark_it->second;
        CHECK_EQ(mark_it->first, log->index());
        file_count++;

        bool delete_ok = true;

        // skip current file
        if (current_log_index == log->index()) {
            delete_ok = false;
        }

        if (delete_ok) {
            std::set<gpid> prevent_gc_replicas_for_this_log;

            for (auto &kv : gc_condition) {
                if (kickout_replicas.find(kv.first) != kickout_replicas.end()) {
                    // no need to consider this replica
                    continue;
                }

                gpid gpid = kv.first;
                decree garbage_max_decree = kv.second.max_decree;
                int64_t valid_start_offset = kv.second.valid_start_offset;

                bool delete_ok_for_this_replica = false;
                bool kickout_this_replica = false;
                auto it3 = max_decrees.find(gpid);

                // log not found for this replica, ok to delete
                if (it3 == max_decrees.end()) {
                    // valid_start_offset may be reset to 0 if initialize_on_load() returns
                    // ERR_INCOMPLETE_DATA
                    CHECK(valid_start_offset == 0 || valid_start_offset >= log->end_offset(),
                          "valid start offset must be 0 or greater than the end of this log file");

                    LOG_DEBUG(
                        "gc @ {}: max_decree for {} is missing vs {} as garbage max decree, it's "
                        "safe to delete this and all older logs for this replica",
                        gpid,
                        log->path(),
                        garbage_max_decree);
                    delete_ok_for_this_replica = true;
                    kickout_this_replica = true;
                }

                // log is invalid for this replica, ok to delete
                else if (log->end_offset() <= valid_start_offset) {
                    LOG_DEBUG(
                        "gc @ {}: log is invalid for {}, as valid start offset vs log end offset = "
                        "{} vs {}, it is therefore safe to delete this and all older logs for this "
                        "replica",
                        gpid,
                        log->path(),
                        valid_start_offset,
                        log->end_offset());
                    delete_ok_for_this_replica = true;
                    kickout_this_replica = true;
                }

                // all decrees are no more than garbage max decree, ok to delete
                else if (it3->second.max_decree <= garbage_max_decree) {
                    LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it is "
                              "therefore safe to delete this and all older logs for this replica",
                              gpid,
                              log->path(),
                              it3->second.max_decree,
                              garbage_max_decree);
                    delete_ok_for_this_replica = true;
                    kickout_this_replica = true;
                }

                else // it3->second.max_decree > garbage_max_decree
                {
                    // should not delete this file
                    LOG_DEBUG("gc @ {}: max_decree for {} is {} vs {} as garbage max decree, it "
                              "is therefore not allowed to delete this and all older logs",
                              gpid,
                              log->path(),
                              it3->second.max_decree,
                              garbage_max_decree);
                    prevent_gc_replicas_for_this_log.insert(gpid);
                    decree gap = it3->second.max_decree - garbage_max_decree;
                    if (log->index() < stop_gc_log_index || gap > stop_gc_decree_gap) {
                        // record the max gap replica for the smallest log
                        stop_gc_replica = gpid;
                        stop_gc_log_index = log->index();
                        stop_gc_decree_gap = gap;
                        stop_gc_garbage_max_decree = garbage_max_decree;
                        stop_gc_log_max_decree = it3->second.max_decree;
                    }
                }

                if (kickout_this_replica) {
                    // files before this file is useless for this replica,
                    // so from now on, this replica will not be considered anymore
                    kickout_replicas.insert(gpid);
                }

                if (!delete_ok_for_this_replica) {
                    // can not delete this file, mark it, and continue to check other replicas
                    delete_ok = false;
                }
            }

            // update prevent_gc_replicas
            if (file_count > file_count_limit && !prevent_gc_replicas_for_this_log.empty()) {
                prevent_gc_replicas.insert(prevent_gc_replicas_for_this_log.begin(),
                                           prevent_gc_replicas_for_this_log.end());
            }
        }

        if (delete_ok) {
            // found the largest file which can be deleted
            break;
        }

        // update max_decrees for the next log file
        max_decrees = log->previous_log_max_decrees();
    }

    if (mark_it == files.rend()) {
        // no file to delete
        if (stop_gc_decree_gap > 0) {
            LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, "
                     "reserved_log_count = {}, reserved_log_size = {}, "
                     "reserved_smallest_log = {}, reserved_largest_log = {}, "
                     "stop_gc_log_index = {}, stop_gc_replica_count = {}, "
                     "stop_gc_replica = {}, stop_gc_decree_gap = {}, "
                     "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}",
                     file_count_limit,
                     reserved_log_count,
                     reserved_log_size,
                     reserved_smallest_log,
                     reserved_largest_log,
                     stop_gc_log_index,
                     prevent_gc_replicas.size(),
                     stop_gc_replica,
                     stop_gc_decree_gap,
                     stop_gc_garbage_max_decree,
                     stop_gc_log_max_decree);
        } else {
            LOG_INFO("gc_shared: no file can be deleted, file_count_limit = {}, "
                     "reserved_log_count = {}, reserved_log_size = {}, "
                     "reserved_smallest_log = {}, reserved_largest_log = {}",
                     file_count_limit,
                     reserved_log_count,
                     reserved_log_size,
                     reserved_smallest_log,
                     reserved_largest_log);
        }

        return reserved_log_count;
    }

    // ok, let's delete files in increasing order of file index
    // to avoid making a hole in the file list
    int largest_log_to_delete = mark_it->second->index();
    int to_delete_log_count = 0;
    int64_t to_delete_log_size = 0;
    int deleted_log_count = 0;
    int64_t deleted_log_size = 0;
    int deleted_smallest_log = 0;
    int deleted_largest_log = 0;
    for (auto it = files.begin(); it != files.end() && it->second->index() <= largest_log_to_delete;
         ++it) {
        log_file_ptr log = it->second;
        CHECK_EQ(it->first, log->index());
        to_delete_log_count++;
        to_delete_log_size += log->end_offset() - log->start_offset();

        // close first
        log->close();

        // delete file
        auto &fpath = log->path();
        if (!dsn::utils::filesystem::remove_path(fpath)) {
            LOG_ERROR("gc_shared: fail to remove {}, stop current gc cycle ...", fpath);
            break;
        }

        // delete succeed
        LOG_INFO("gc_shared: log file {} is removed", fpath);
        deleted_log_count++;
        deleted_log_size += log->end_offset() - log->start_offset();
        if (deleted_smallest_log == 0)
            deleted_smallest_log = log->index();
        deleted_largest_log = log->index();

        // erase from _log_files
        {
            zauto_lock l(_lock);
            _log_files.erase(it->first);
            _global_start_offset =
                _log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
            reserved_log_count = _log_files.size();
            reserved_log_size = total_size_no_lock();
            if (reserved_log_count > 0) {
                reserved_smallest_log = _log_files.begin()->first;
                reserved_largest_log = _log_files.rbegin()->first;
            } else {
                reserved_smallest_log = -1;
                reserved_largest_log = -1;
            }
        }
    }

    if (stop_gc_decree_gap > 0) {
        LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
                 "reserved_log_count = {}, reserved_log_size = {}, "
                 "reserved_smallest_log = {}, reserved_largest_log = {}, "
                 "to_delete_log_count = {}, to_delete_log_size = {}, "
                 "deleted_log_count = {}, deleted_log_size = {}, "
                 "deleted_smallest_log = {}, deleted_largest_log = {}, "
                 "stop_gc_log_index = {}, stop_gc_replica_count = {}, "
                 "stop_gc_replica = {}, stop_gc_decree_gap = {}, "
                 "stop_gc_garbage_max_decree = {}, stop_gc_log_max_decree = {}",
                 file_count_limit,
                 reserved_log_count,
                 reserved_log_size,
                 reserved_smallest_log,
                 reserved_largest_log,
                 to_delete_log_count,
                 to_delete_log_size,
                 deleted_log_count,
                 deleted_log_size,
                 deleted_smallest_log,
                 deleted_largest_log,
                 stop_gc_log_index,
                 prevent_gc_replicas.size(),
                 stop_gc_replica,
                 stop_gc_decree_gap,
                 stop_gc_garbage_max_decree,
                 stop_gc_log_max_decree);
    } else {
        LOG_INFO("gc_shared: deleted some files, file_count_limit = {}, "
                 "reserved_log_count = {}, reserved_log_size = {}, "
                 "reserved_smallest_log = {}, reserved_largest_log = {}, "
                 "to_delete_log_count = {}, to_delete_log_size = {}, "
                 "deleted_log_count = {}, deleted_log_size = {}, "
                 "deleted_smallest_log = {}, deleted_largest_log = {}",
                 file_count_limit,
                 reserved_log_count,
                 reserved_log_size,
                 reserved_smallest_log,
                 reserved_largest_log,
                 to_delete_log_count,
                 to_delete_log_size,
                 deleted_log_count,
                 deleted_log_size,
                 deleted_smallest_log,
                 deleted_largest_log);
    }

    return reserved_log_count;
}