in src/replica/mutation_log.cpp [526:708]
error_code mutation_log::open(replay_callback read_callback,
io_failure_callback write_error_callback,
const std::map<gpid, decree> &replay_condition)
{
CHECK(!_is_opened, "cannot open a opened mutation_log");
CHECK(nullptr == _current_log_file, "");
// create dir if necessary
if (!dsn::utils::filesystem::path_exists(_dir)) {
if (!dsn::utils::filesystem::create_directory(_dir)) {
LOG_ERROR("open mutation_log: create log path failed");
return ERR_FILE_OPERATION_FAILED;
}
}
// load the existing logs
_log_files.clear();
_io_error_callback = write_error_callback;
std::vector<std::string> file_list;
if (!dsn::utils::filesystem::get_subfiles(_dir, file_list, false)) {
LOG_ERROR("open mutation_log: get subfiles failed.");
return ERR_FILE_OPERATION_FAILED;
}
if (nullptr == read_callback) {
CHECK(file_list.empty(), "");
}
std::sort(file_list.begin(), file_list.end());
error_code err = ERR_OK;
for (auto &fpath : file_list) {
log_file_ptr log = log_file::open_read(fpath.c_str(), err);
if (log == nullptr) {
if (err == ERR_HANDLE_EOF || err == ERR_INCOMPLETE_DATA ||
err == ERR_INVALID_PARAMETERS) {
LOG_WARNING("skip file {} during log init, err = {}", fpath, err);
continue;
} else {
return err;
}
}
if (_is_private) {
LOG_INFO("open private log {} succeed, start_offset = {}, end_offset = {}, size = "
"{}, previous_max_decree = {}",
fpath,
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset(),
log->previous_log_max_decree(_private_gpid));
} else {
LOG_INFO("open shared log {} succeed, start_offset = {}, end_offset = {}, size = {}",
fpath,
log->start_offset(),
log->end_offset(),
log->end_offset() - log->start_offset());
}
CHECK(_log_files.find(log->index()) == _log_files.end(),
"invalid log_index, index = {}",
log->index());
_log_files[log->index()] = log;
}
file_list.clear();
// filter useless log
std::map<int, log_file_ptr>::iterator replay_begin = _log_files.begin();
std::map<int, log_file_ptr>::iterator replay_end = _log_files.end();
if (!replay_condition.empty()) {
if (_is_private) {
auto find = replay_condition.find(_private_gpid);
CHECK(find != replay_condition.end(), "invalid gpid({})", _private_gpid);
for (auto it = _log_files.begin(); it != _log_files.end(); ++it) {
if (it->second->previous_log_max_decree(_private_gpid) <= find->second) {
// previous logs can be ignored
replay_begin = it;
} else {
break;
}
}
} else {
// find the largest file which can be ignored.
// after iterate, the 'mark_it' will point to the largest file which can be ignored.
std::map<int, log_file_ptr>::reverse_iterator mark_it;
std::set<gpid> kickout_replicas;
replica_log_info_map max_decrees; // max_decrees for log file at mark_it.
for (mark_it = _log_files.rbegin(); mark_it != _log_files.rend(); ++mark_it) {
bool ignore_this = true;
if (mark_it == _log_files.rbegin()) {
// the last file should not be ignored
ignore_this = false;
}
if (ignore_this) {
for (auto &kv : replay_condition) {
if (kickout_replicas.find(kv.first) != kickout_replicas.end()) {
// no need to consider this replica
continue;
}
auto find = max_decrees.find(kv.first);
if (find == max_decrees.end() || find->second.max_decree <= kv.second) {
// can ignore for this replica
kickout_replicas.insert(kv.first);
} else {
ignore_this = false;
break;
}
}
}
if (ignore_this) {
// found the largest file which can be ignored
break;
}
// update max_decrees for the next log file
max_decrees = mark_it->second->previous_log_max_decrees();
}
if (mark_it != _log_files.rend()) {
// set replay_begin to the next position of mark_it.
replay_begin = _log_files.find(mark_it->first);
CHECK(replay_begin != _log_files.end(),
"invalid log_index, index = {}",
mark_it->first);
replay_begin++;
CHECK(replay_begin != _log_files.end(),
"invalid log_index, index = {}",
mark_it->first);
}
}
for (auto it = _log_files.begin(); it != replay_begin; it++) {
LOG_INFO("ignore log {}", it->second->path());
}
}
// replay with the found files
std::map<int, log_file_ptr> replay_logs(replay_begin, replay_end);
int64_t end_offset = 0;
err = replay(
replay_logs,
[this, read_callback](int log_length, mutation_ptr &mu) {
bool ret = true;
if (read_callback) {
ret = read_callback(log_length,
mu); // actually replica::replay_mutation(mu, true|false);
}
if (ret) {
this->update_max_decree_no_lock(mu->data.header.pid, mu->data.header.decree);
if (this->_is_private) {
this->update_max_commit_on_disk_no_lock(mu->data.header.last_committed_decree);
}
}
return ret;
},
end_offset);
if (ERR_OK == err) {
_global_start_offset =
_log_files.size() > 0 ? _log_files.begin()->second->start_offset() : 0;
_global_end_offset = end_offset;
_last_file_index = _log_files.size() > 0 ? _log_files.rbegin()->first : 0;
_is_opened = true;
} else {
// clear
for (auto &kv : _log_files) {
kv.second->close();
}
_log_files.clear();
init_states();
}
return err;
}