in src/core/fishstore.h [2480:2548]
Status FishStore<D, A>::RecoverHybridLog() {
class Context : public IAsyncContext {
public:
Context(hlog_t& hlog_, uint32_t page_, RecoveryStatus& recovery_status_)
: hlog{ &hlog_}
, page{ page_ }
, recovery_status{ &recovery_status_ } {
}
/// The deep-copy constructor
Context(const Context& other)
: hlog{ other.hlog }
, page{ other.page }
, recovery_status{ other.recovery_status } {
}
protected:
Status DeepCopy_Internal(IAsyncContext*& context_copy) final {
return IAsyncContext::DeepCopy_Internal(*this, context_copy);
}
public:
hlog_t* hlog;
uint32_t page;
RecoveryStatus* recovery_status;
};
auto callback = [](IAsyncContext* ctxt, Status result) {
CallbackContext<Context> context{ ctxt };
result = context->hlog->AsyncReadPagesFromLog(context->page, 1, *context->recovery_status);
};
Address from_address = checkpoint_.index_metadata.checkpoint_start_address;
Address to_address = checkpoint_.log_metadata.final_address;
uint32_t start_page = from_address.page();
uint32_t end_page = to_address.offset() > 0 ? to_address.page() + 1 : to_address.page();
uint32_t capacity = hlog.buffer_size();
RecoveryStatus recovery_status{ start_page, end_page };
// Initially issue read request for all pages that can be held in memory
uint32_t total_pages_to_read = end_page - start_page;
uint32_t pages_to_read_first = std::min(capacity, total_pages_to_read);
RETURN_NOT_OK(hlog.AsyncReadPagesFromLog(start_page, pages_to_read_first, recovery_status));
for(uint32_t page = start_page; page < end_page; ++page) {
while(recovery_status.page_status(page) != PageRecoveryStatus::ReadDone) {
disk.TryComplete();
std::this_thread::sleep_for(10ms);
}
// handle start and end at non-page boundaries
RETURN_NOT_OK(RecoverFromPage(page == start_page ? from_address : Address{ page, 0 },
page + 1 == end_page ? to_address :
Address{ page, Address::kMaxOffset }));
// OS thread flushes current page and issues a read request if necessary
if(page + capacity < end_page) {
Context context{ hlog, page + capacity, recovery_status };
RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, callback, &context));
} else {
RETURN_NOT_OK(hlog.AsyncFlushPage(page, recovery_status, nullptr, nullptr));
}
}
// Wait until all pages have been flushed
for(uint32_t page = start_page; page < end_page; ++page) {
while(recovery_status.page_status(page) != PageRecoveryStatus::FlushDone) {
disk.TryComplete();
std::this_thread::sleep_for(10ms);
}
}
return Status::Ok;
}