in src/core/fishstore.h [3515:3695]
Status FishStore<D, A>::Recover(const Guid& index_token, const Guid& hybrid_log_token,
uint32_t& version,
std::vector<Guid>& session_ids) {
version = 0;
session_ids.clear();
SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version };
if(!system_state_.compare_exchange_strong(expected,
SystemState{ Action::Recover, Phase::REST, expected.version })) {
return Status::Aborted;
}
checkpoint_.InitializeRecover(index_token, hybrid_log_token);
Status status;
#define BREAK_NOT_OK(s) \
status = (s); \
if (status != Status::Ok) break
do {
// Index and log metadata.
BREAK_NOT_OK(ReadIndexMetadata(index_token));
BREAK_NOT_OK(ReadCprMetadata(hybrid_log_token));
if(checkpoint_.index_metadata.version != checkpoint_.log_metadata.version) {
// Index and hybrid-log checkpoints should have the same version.
status = Status::Corruption;
break;
}
system_state_.store(SystemState{ Action::Recover, Phase::REST,
checkpoint_.log_metadata.version + 1 });
BREAK_NOT_OK(ReadCprContexts(hybrid_log_token, checkpoint_.log_metadata.guids));
// The index itself (including overflow buckets).
BREAK_NOT_OK(RecoverFuzzyIndex());
BREAK_NOT_OK(RecoverFuzzyIndexComplete(true));
// Any changes made to the log while the index was being fuzzy-checkpointed.
if(fold_over_snapshot) {
BREAK_NOT_OK(RecoverHybridLog());
} else {
BREAK_NOT_OK(RecoverHybridLogFromSnapshotFile());
}
BREAK_NOT_OK(RestoreHybridLog());
} while(false);
std::ifstream naming_file(disk.naming_checkpoint_path(hybrid_log_token));
size_t n_fields;
naming_file >> n_fields;
for(size_t i = 0; i < n_fields; ++i) {
std::string field_name;
naming_file >> field_name;
field_names.push_back(field_name);
field_lookup_map.emplace(std::make_pair(field_name, static_cast<uint16_t>(i)));
}
size_t n_libs;
naming_file >> n_libs;
for(size_t i = 0; i < n_libs; ++i) {
std::string path;
naming_file >> path;
LibraryHandle lib;
lib.path = std::experimental::filesystem::absolute(path);
#ifdef _WIN32
lib.handle = LoadLibrary(lib.path.string().c_str());
#else
lib.handle = dlopen(lib.path.string().c_str(), RTLD_LAZY);
#endif
assert(lib.handle);
libs.emplace_back(lib);
}
size_t n_general_psf;
naming_file >> n_general_psf;
for(size_t i = 0; i < n_general_psf; ++i) {
int64_t lib_id;
size_t field_cnt;
std::string func_name;
GeneralPSF<A> psf;
naming_file >> field_cnt;
for(size_t i = 0; i < field_cnt; ++i) {
uint16_t field_id;
naming_file >> field_id;
psf.fields.push_back(field_id);
}
naming_file >> lib_id >> func_name;
psf.lib_id = lib_id;
psf.func_name = func_name;
if (psf.lib_id != -1) {
#ifdef _WIN32
psf.eval_ = (general_psf_t<A>)GetProcAddress(libs[lib_id].handle, func_name.c_str());
#else
psf.eval_ = (general_psf_t<A>)dlsym(libs[lib_id].handle, func_name.c_str());
#endif
} else {
psf.eval_ = projection<A>;
}
assert(psf.eval_);
general_psf_map.push_back(psf);
}
size_t n_inline_psf;
naming_file >> n_inline_psf;
for(size_t i = 0; i < n_inline_psf; ++i) {
int64_t lib_id;
size_t field_cnt;
std::string func_name;
InlinePSF<A> psf;
naming_file >> field_cnt;
for(size_t i = 0; i < field_cnt; ++i) {
uint16_t field_id;
naming_file >> field_id;
psf.fields.push_back(field_id);
}
naming_file >> lib_id >> func_name;
psf.lib_id = lib_id;
psf.func_name = func_name;
if (psf.lib_id != -1) {
#ifdef _WIN32
psf.eval_ = (inline_psf_t<A>)GetProcAddress(libs[lib_id].handle, func_name.c_str());
#else
psf.eval_ = (inline_psf_t<A>)dlsym(libs[lib_id].handle, func_name.c_str());
#endif
}
assert(psf.eval_);
inline_psf_map.push_back(psf);
}
ParserState parser_state;
size_t n_reg_general;
naming_file >> n_reg_general;
for(size_t i = 0; i < n_reg_general; ++i) {
uint16_t general_psf_id;
naming_file >> general_psf_id;
parser_state.ptr_general_psf.emplace(general_psf_id);
}
size_t n_reg_inline;
naming_file >> n_reg_inline;
for(size_t i = 0; i < n_reg_inline; ++i) {
uint32_t inline_psf_id;
naming_file >> inline_psf_id;
parser_state.ptr_inline_psf.emplace(inline_psf_id);
}
// Construct parser essentials
std::unordered_set<uint16_t> field_ids;
for(auto psf_id : parser_state.ptr_general_psf) {
for(auto field_id : general_psf_map[psf_id].fields) {
field_ids.insert(field_id);
}
}
for(auto psf_id : parser_state.ptr_inline_psf) {
for(auto field_id : inline_psf_map[psf_id].fields) {
field_ids.insert(field_id);
}
}
parser_state.main_parser_fields.clear();
parser_state.main_parser_field_ids.clear();
for(auto field_id : field_ids) {
parser_state.main_parser_fields.push_back(field_names[field_id]);
parser_state.main_parser_field_ids.push_back(field_id);
}
parser_states[0] = parser_state;
parser_states[1] = parser_state;
system_parser_no_ = 0;
naming_file.close();
if(status == Status::Ok) {
for(const auto& token : checkpoint_.continue_tokens) {
session_ids.push_back(token.first);
}
version = checkpoint_.log_metadata.version;
}
checkpoint_.RecoverDone();
system_state_.store(SystemState{ Action::None, Phase::REST,
checkpoint_.log_metadata.version + 1 });
return status;
#undef BREAK_NOT_OK
}