Status FishStore::Recover()

in src/core/fishstore.h [3515:3695]


Status FishStore<D, A>::Recover(const Guid& index_token, const Guid& hybrid_log_token,
                                uint32_t& version,
                                std::vector<Guid>& session_ids) {
  version = 0;
  session_ids.clear();
  SystemState expected = SystemState{ Action::None, Phase::REST, system_state_.load().version };
  if(!system_state_.compare_exchange_strong(expected,
      SystemState{ Action::Recover, Phase::REST, expected.version })) {
    return Status::Aborted;
  }
  checkpoint_.InitializeRecover(index_token, hybrid_log_token);
  Status status;
#define BREAK_NOT_OK(s) \
    status = (s); \
    if (status != Status::Ok) break

  do {
    // Index and log metadata.
    BREAK_NOT_OK(ReadIndexMetadata(index_token));
    BREAK_NOT_OK(ReadCprMetadata(hybrid_log_token));
    if(checkpoint_.index_metadata.version != checkpoint_.log_metadata.version) {
      // Index and hybrid-log checkpoints should have the same version.
      status = Status::Corruption;
      break;
    }

    system_state_.store(SystemState{ Action::Recover, Phase::REST,
                                     checkpoint_.log_metadata.version + 1 });

    BREAK_NOT_OK(ReadCprContexts(hybrid_log_token, checkpoint_.log_metadata.guids));
    // The index itself (including overflow buckets).
    BREAK_NOT_OK(RecoverFuzzyIndex());
    BREAK_NOT_OK(RecoverFuzzyIndexComplete(true));
    // Any changes made to the log while the index was being fuzzy-checkpointed.
    if(fold_over_snapshot) {
      BREAK_NOT_OK(RecoverHybridLog());
    } else {
      BREAK_NOT_OK(RecoverHybridLogFromSnapshotFile());
    }
    BREAK_NOT_OK(RestoreHybridLog());
  } while(false);

  std::ifstream naming_file(disk.naming_checkpoint_path(hybrid_log_token));
  size_t n_fields;
  naming_file >> n_fields;
  for(size_t i = 0; i < n_fields; ++i) {
    std::string field_name;
    naming_file >> field_name;
    field_names.push_back(field_name);
    field_lookup_map.emplace(std::make_pair(field_name, static_cast<uint16_t>(i)));
  }

  size_t n_libs;
  naming_file >> n_libs;
  for(size_t i = 0; i < n_libs; ++i) {
    std::string path;
    naming_file >> path;
    LibraryHandle lib;
    lib.path = std::experimental::filesystem::absolute(path);
#ifdef _WIN32
    lib.handle = LoadLibrary(lib.path.string().c_str());
#else
    lib.handle = dlopen(lib.path.string().c_str(), RTLD_LAZY);
#endif
    assert(lib.handle);
    libs.emplace_back(lib);
  }

  size_t n_general_psf;
  naming_file >> n_general_psf;
  for(size_t i = 0; i < n_general_psf; ++i) {
    int64_t lib_id;
    size_t field_cnt;
    std::string func_name;
    GeneralPSF<A> psf;
    naming_file >> field_cnt;
    for(size_t i = 0; i < field_cnt; ++i) {
      uint16_t field_id;
      naming_file >> field_id;
      psf.fields.push_back(field_id);
    }
    naming_file >> lib_id >> func_name;
    psf.lib_id = lib_id;
    psf.func_name = func_name;
    if (psf.lib_id != -1) {
#ifdef _WIN32
      psf.eval_ = (general_psf_t<A>)GetProcAddress(libs[lib_id].handle, func_name.c_str());
#else
      psf.eval_ = (general_psf_t<A>)dlsym(libs[lib_id].handle, func_name.c_str());
#endif
    } else {
      psf.eval_ = projection<A>;
    }
    assert(psf.eval_);
    general_psf_map.push_back(psf);
  }

  size_t n_inline_psf;
  naming_file >> n_inline_psf;
  for(size_t i = 0; i < n_inline_psf; ++i) {
    int64_t lib_id;
    size_t field_cnt;
    std::string func_name;
    InlinePSF<A> psf;
    naming_file >> field_cnt;
    for(size_t i = 0; i < field_cnt; ++i) {
      uint16_t field_id;
      naming_file >> field_id;
      psf.fields.push_back(field_id);
    }
    naming_file >> lib_id >> func_name;
    psf.lib_id = lib_id;
    psf.func_name = func_name;
    if (psf.lib_id != -1) {
#ifdef _WIN32
      psf.eval_ = (inline_psf_t<A>)GetProcAddress(libs[lib_id].handle, func_name.c_str());
#else
      psf.eval_ = (inline_psf_t<A>)dlsym(libs[lib_id].handle, func_name.c_str());
#endif
    }
    assert(psf.eval_);
    inline_psf_map.push_back(psf);
  }

  ParserState parser_state;
  size_t n_reg_general;
  naming_file >> n_reg_general;
  for(size_t i = 0; i < n_reg_general; ++i) {
    uint16_t general_psf_id;
    naming_file >> general_psf_id;
    parser_state.ptr_general_psf.emplace(general_psf_id);
  }
  size_t n_reg_inline;
  naming_file >> n_reg_inline;
  for(size_t i = 0; i < n_reg_inline; ++i) {
    uint32_t inline_psf_id;
    naming_file >> inline_psf_id;
    parser_state.ptr_inline_psf.emplace(inline_psf_id);
  }

  // Construct parser essentials
  std::unordered_set<uint16_t> field_ids;
  for(auto psf_id : parser_state.ptr_general_psf) {
    for(auto field_id : general_psf_map[psf_id].fields) {
      field_ids.insert(field_id);
    }
  }

  for(auto psf_id : parser_state.ptr_inline_psf) {
    for(auto field_id : inline_psf_map[psf_id].fields) {
      field_ids.insert(field_id);
    }
  }

  parser_state.main_parser_fields.clear();
  parser_state.main_parser_field_ids.clear();
  for(auto field_id : field_ids) {
    parser_state.main_parser_fields.push_back(field_names[field_id]);
    parser_state.main_parser_field_ids.push_back(field_id);
  }

  parser_states[0] = parser_state;
  parser_states[1] = parser_state;
  system_parser_no_ = 0;


  naming_file.close();

  if(status == Status::Ok) {
    for(const auto& token : checkpoint_.continue_tokens) {
      session_ids.push_back(token.first);
    }
    version = checkpoint_.log_metadata.version;
  }

  checkpoint_.RecoverDone();
  system_state_.store(SystemState{ Action::None, Phase::REST,
                                   checkpoint_.log_metadata.version + 1 });
  return status;
#undef BREAK_NOT_OK
}