in src/core/fishstore.h [1469:1529]
inline OperationStatus FishStore<D, A>::InternalFullScan(C& pending_context,
uint64_t begin_addr, uint64_t end_addr) {
assert(begin_addr < end_addr);
typedef C pending_full_scan_context_t;
// Scan starts from the current address, go reverse page by page.
Address to_address = hlog.GetTailAddress();
if(end_addr < to_address.control()) to_address = end_addr;
uint32_t current_page = to_address.page();
Address start_address{begin_addr};
while(current_page >= 0) {
// If the page trying to read is below head page, ready to go into disk.
Address address(current_page, 0);
if(current_page < hlog.head_address.load().page()) break;
// Scan the page, record by record to the end of the page or current tail.
while(address < to_address) {
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
if(record->header.IsNull()) {
address += sizeof(RecordInfo);
continue;
}
if(address <= start_address) {
address += record->size();
continue;
}
if(!record->header.invalid &&
pending_context.check(record->payload(), record->payload_size())) {
pending_context.Touch(record);
}
address += record->size();
}
if(current_page == start_address.page() ||
current_page == hlog.begin_address.load().page()) {
// Finish scanning the whole log in memory.
pending_context.Finalize();
return OperationStatus::SUCCESS;
}
// Do a refresh after finish reading a page so that we can release the
// protection for the page. Note that head address may change after
// Refresh().
Refresh();
--current_page;
to_address = Address{current_page, Address::kMaxOffset};
}
#ifdef _NULL_DISK
pending_context.Finalize();
return OperationStatus::SUCCESS;
#else
// Kick async to disk to scan log live on disk.
pending_context.go_async(thread_ctx().phase, thread_ctx().version, Address{current_page, 0},
begin_addr, end_addr);
return OperationStatus::SCAN_IN_PROGRESS;
#endif
}