error_code mutation_log::open()

in src/replica/mutation_log.cpp [526:708]


error_code mutation_log::open(replay_callback read_callback,
                              io_failure_callback write_error_callback,
                              const std::map<gpid, decree> &replay_condition)
{
    CHECK(!_is_opened, "cannot open a opened mutation_log");
    CHECK(nullptr == _current_log_file, "");

    // create dir if necessary
    if (!dsn::utils::filesystem::path_exists(_dir)) {
        if (!dsn::utils::filesystem::create_directory(_dir)) {
            LOG_ERROR("open mutation_log: create log path failed");
            return ERR_FILE_OPERATION_FAILED;
        }
    }

    // load the existing logs
    _log_files.clear();
    _io_error_callback = write_error_callback;

    std::vector<std::string> file_list;
    if (!dsn::utils::filesystem::get_subfiles(_dir, file_list, false)) {
        LOG_ERROR("open mutation_log: get subfiles failed.");
        return ERR_FILE_OPERATION_FAILED;
    }

    if (nullptr == read_callback) {
        CHECK(file_list.empty(), "");
    }

    std::sort(file_list.begin(), file_list.end());

    error_code err = ERR_OK;
    for (auto &fpath : file_list) {
        log_file_ptr log = log_file::open_read(fpath.c_str(), err);
        if (log == nullptr) {
            if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA ||
                err == ERR_INVALID_PARAMETERS) {
                LOG_WARNING("skip file {} during log init, err = {}", fpath, err);
                continue;
            } else {
                return err;
            }
        }

        if (_is_private) {
            LOG_INFO("open private log {} succeed, start_offset = {}, end_offset = {}, size = "
                     "{}, previous_max_decree = {}",
                     fpath,
                     log->start_offset(),
                     log->end_offset(),
                     log->end_offset() - log->start_offset(),
                     log->previous_log_max_decree(_private_gpid));
        } else {
            LOG_INFO("open shared log {} succeed, start_offset = {}, end_offset = {}, size = {}",
                     fpath,
                     log->start_offset(),
                     log->end_offset(),
                     log->end_offset() - log->start_offset());
        }

        CHECK(_log_files.find(log->index()) == _log_files.end(),
              "invalid log_index, index = {}",
              log->index());
        _log_files[log->index()] = log;
    }

    file_list.clear();

    // filter useless log
    std::map<int, log_file_ptr>::iterator replay_begin = _log_files.begin();
    std::map<int, log_file_ptr>::iterator replay_end = _log_files.end();
    if (!replay_condition.empty()) {
        if (_is_private) {
            auto find = replay_condition.find(_private_gpid);
            CHECK(find != replay_condition.end(), "invalid gpid({})", _private_gpid);
            for (auto it = _log_files.begin(); it != _log_files.end(); ++it) {
                if (it->second->previous_log_max_decree(_private_gpid) <= find->second) {
                    // previous logs can be ignored
                    replay_begin = it;
                } else {
                    break;
                }
            }
        } else {
            // find the largest file which can be ignored.
            // after iterate, the 'mark_it' will point to the largest file which can be ignored.
            std::map<int, log_file_ptr>::reverse_iterator mark_it;
            std::set<gpid> kickout_replicas;
            replica_log_info_map max_decrees; // max_decrees for log file at mark_it.
            for (mark_it = _log_files.rbegin(); mark_it != _log_files.rend(); ++mark_it) {
                bool ignore_this = true;

                if (mark_it == _log_files.rbegin()) {
                    // the last file should not be ignored
                    ignore_this = false;
                }

                if (ignore_this) {
                    for (auto &kv : replay_condition) {
                        if (kickout_replicas.find(kv.first) != kickout_replicas.end()) {
                            // no need to consider this replica
                            continue;
                        }

                        auto find = max_decrees.find(kv.first);
                        if (find == max_decrees.end() || find->second.max_decree <= kv.second) {
                            // can ignore for this replica
                            kickout_replicas.insert(kv.first);
                        } else {
                            ignore_this = false;
                            break;
                        }
                    }
                }

                if (ignore_this) {
                    // found the largest file which can be ignored
                    break;
                }

                // update max_decrees for the next log file
                max_decrees = mark_it->second->previous_log_max_decrees();
            }

            if (mark_it != _log_files.rend()) {
                // set replay_begin to the next position of mark_it.
                replay_begin = _log_files.find(mark_it->first);
                CHECK(replay_begin != _log_files.end(),
                      "invalid log_index, index = {}",
                      mark_it->first);
                replay_begin++;
                CHECK(replay_begin != _log_files.end(),
                      "invalid log_index, index = {}",
                      mark_it->first);
            }
        }

        for (auto it = _log_files.begin(); it != replay_begin; it++) {
            LOG_INFO("ignore log {}", it->second->path());
        }
    }

    // replay with the found files
    std::map<int, log_file_ptr> replay_logs(replay_begin, replay_end);
    int64_t end_offset = 0;
    err = replay(
        replay_logs,
        [this, read_callback](int log_length, mutation_ptr &mu) {
            bool ret = true;

            if (read_callback) {
                ret = read_callback(log_length,
                                    mu); // actually replica::replay_mutation(mu, true|false);
            }

            if (ret) {
                this->update_max_decree_no_lock(mu->data.header.pid, mu->data.header.decree);
                if (this->_is_private) {
                    this->update_max_commit_on_disk_no_lock(mu->data.header.last_committed_decree);
                }
            }

            return ret;
        },
        end_offset);

    if (ERR_OK == err) {
        _global_start_offset =
            _log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
        _global_end_offset = end_offset;
        _last_file_index = _log_files.size() > 0 ? _log_files.rbegin()->first : 0;
        _is_opened = true;
    } else {
        // clear
        for (auto &kv : _log_files) {
            kv.second->close();
        }
        _log_files.clear();
        init_states();
    }

    return err;
}