Status FishStore::RecoverHybridLogFromSnapshotFile()

in src/core/fishstore.h [2551:2638]


Status FishStore<D, A>::RecoverHybridLogFromSnapshotFile() {
  class Context : public IAsyncContext {
   public:
    Context(hlog_t& hlog_, file_t& file_, uint32_t file_start_page_, uint32_t page_,
            RecoveryStatus& recovery_status_)
      : hlog{ &hlog_ }
      , file{ &file_ }
      , file_start_page{ file_start_page_ }
      , page{ page_ }
      , recovery_status{ &recovery_status_ } {
    }
    /// The deep-copy constructor
    Context(const Context& other)
      : hlog{ other.hlog }
      , file{ other.file }
      , file_start_page{ other.file_start_page }
      , page{ other.page }
      , recovery_status{ other.recovery_status } {
    }
   protected:
    Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
      return IAsyncContext::DeepCopy_Internal(*this, context_copy);
    }
   public:
    hlog_t* hlog;
    file_t* file;
    uint32_t file_start_page;
    uint32_t page;
    RecoveryStatus* recovery_status;
  };

  auto callback = [](IAsyncContext* ctxt, Status result) {
    CallbackContext<Context> context{ ctxt };
    result = context->hlog->AsyncReadPagesFromSnapshot(*context->file,
             context->file_start_page, context->page, 1, *context->recovery_status);
  };

  Address file_start_address = checkpoint_.log_metadata.flushed_address;
  Address from_address = checkpoint_.index_metadata.checkpoint_start_address;
  Address to_address = checkpoint_.log_metadata.final_address;

  uint32_t start_page = file_start_address.page();
  uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page();
  uint32_t capacity = hlog.buffer_size();
  RecoveryStatus recovery_status{ start_page, end_page };
  checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path(
                                checkpoint_.hybrid_log_token) + "snapshot.dat");
  RETURN_NOT_OK(checkpoint_.snapshot_file.Open(&disk.handler()));

  // Initially issue read request for all pages that can be held in memory
  uint32_t total_pages_to_read = end_page - start_page;
  uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read);
  RETURN_NOT_OK(hlog.AsyncReadPagesFromSnapshot(checkpoint_.snapshot_file, start_page, start_page,
                pages_to_read_first, recovery_status));

  for(uint32_t page = start_page; page < end_page; ++page) {
    while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) {
      disk.TryComplete();
      std::this_thread::sleep_for(10ms);
    }

    // Perform recovery if page in fuzzy portion of the log
    if(Address{ page + 1, 0 } > from_address) {
      // handle start and end at non-page boundaries
      RETURN_NOT_OK(RecoverFromPage(page == from_address.page() ? from_address :
                                    Address{ page, 0 },
                                    page + 1 == end_page ? to_address :
                                    Address{ page, Address::kMaxOffset }));
    }

    // OS thread flushes current page and issues a read request if necessary
    if(page + capacity < end_page) {
      Context context{ hlog, checkpoint_.snapshot_file, start_page, page + capacity,
                       recovery_status };
      RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context));
    } else {
      RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr));
    }
  }
  // Wait until all pages have been flushed
  for(uint32_t page = start_page; page < end_page; ++page) {
    while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) {
      disk.TryComplete();
      std::this_thread::sleep_for(10ms);
    }
  }
  return Status::Ok;
}