bool FishStore::CheckpointHybridLog()

in src/core/fishstore.h [3444:3512]


bool FishStore<D, A>::CheckpointHybridLog(const std::function<void(Status, uint64_t, uint32_t)>&
    hybrid_log_persistence_callback, Guid& token) {
#ifdef _TIMER
  checkpoint_start = std::chrono::high_resolution_clock::now();
#endif
  // Only one thread can initiate a checkpoint at a time.
  SystemState expected{ Action::None, Phase::REST, system_state_.load().version };
  SystemState desired{ Action::CheckpointHybridLog, Phase::REST, expected.version };
  if(!system_state_.compare_exchange_strong(expected, desired)) {
    // Can't start a new checkpoint while a checkpoint or recovery is already in progress.
    return false;
  }
  // We are going to start a checkpoint.
  epoch_.ResetPhaseFinished();
  // Initialize all contexts
  token = Guid::Create();
  disk.CreateCprCheckpointDirectory(token);

  FILE* naming_file = fopen(disk.naming_checkpoint_path(token).c_str(), "w");
  fprintf(naming_file, "%zu\n", field_names.size());
  for(auto& field: field_names) {
    fprintf(naming_file, "%s\n", field.c_str());
  }
  fprintf(naming_file, "%zu\n", libs.size());
  for(auto& lib : libs) {
    fprintf(naming_file, "%s\n", lib.path.string().c_str());
  }
  fprintf(naming_file, "%zu\n", general_psf_map.size());
  for(const GeneralPSF<A>& psf : general_psf_map) {
    fprintf(naming_file, "%zu", psf.fields.size());
    for(auto& field : psf.fields) {
      fprintf(naming_file, " %u",field);
    }
    fprintf(naming_file, " %zd %s\n", psf.lib_id,
            psf.func_name.c_str());
  }
  fprintf(naming_file, "%zu\n", inline_psf_map.size());
  for(const InlinePSF<A>& psf : inline_psf_map) {
    fprintf(naming_file, "%zu", psf.fields.size());
    for(auto& field : psf.fields) {
      fprintf(naming_file, " %u",field);
    }
    fprintf(naming_file, " %zd %s\n", psf.lib_id,
            psf.func_name.c_str());
  }
  const ParserState& parser_state = parser_states[system_parser_no_.load()];
  fprintf(naming_file, "%zu\n", parser_state.ptr_general_psf.size());
  for(const auto& psf_id : parser_state.ptr_general_psf) {
    fprintf(naming_file, "%u\n", psf_id);
  }
  fprintf(naming_file, "%zu\n", parser_state.ptr_inline_psf.size());
  for(const auto& psf_id : parser_state.ptr_inline_psf) {
    fprintf(naming_file, "%u\n", psf_id);
  }
  fclose(naming_file);

  // Obtain tail address for fuzzy index checkpoint
  if(!fold_over_snapshot) {
    checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, true,
        hlog.flushed_until_address.load(), hybrid_log_persistence_callback);
  } else {
    checkpoint_.InitializeHybridLogCheckpoint(token, desired.version, false,
        Address::kInvalidAddress, hybrid_log_persistence_callback);
  }
  InitializeCheckpointLocks();
  // Let other threads know that the checkpoint has started.
  system_state_.store(desired.GetNextState());
  return true;
}