in src/core/fishstore.h [1380:1465]
inline OperationStatus FishStore<D, A>::InternalScan(C& pending_context,
uint64_t start_addr, uint64_t end_addr) const {
assert(start_addr < end_addr);
typedef C pending_scan_context_t;
KeyHash hash = pending_context.get_hash();
if(thread_ctx().phase != Phase::REST) {
const_cast<fishstore_t*>(this)->HeavyEnter();
}
const AtomicHashBucketEntry* atomic_entry = FindEntry(hash);
if(!atomic_entry) {
// Nothing found.
// There is no NOT_FOUND status for Scan.
// If nothing is found, still call finalize.
pending_context.Finalize();
return OperationStatus::SUCCESS;
}
HashBucketEntry entry = atomic_entry->load();
Address address = entry.address();
Address begin_address = hlog.begin_address.load();
Address head_address = hlog.head_address.load();
uint64_t latest_record_version = 0;
const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
while(address > end_addr && address >= head_address) {
address = kpt->prev_address;
kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
}
// We only concern about one chain right now.
if(address >= head_address) {
const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
record_t* record = kpt->get_record();
latest_record_version = record->header.checkpoint_version;
if(record->header.invalid || !pending_context.check(kpt)) {
address =
TraceBackForMatch(pending_context, kpt->prev_address, head_address);
}
}
switch(thread_ctx().phase) {
case Phase::PREPARE:
// Reading old version (v).
if(latest_record_version > thread_ctx().version) {
// CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
// what we've seen.
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address);
return OperationStatus::CPR_SHIFT_DETECTED;
}
break;
default:
break;
}
// Explore the hash chain until hit head_address. Thus, all in-memory records
// on the chain will be touched.
while(address >= start_addr && address >= head_address) {
#ifdef _SCAN_BENCH
visited_address.emplace_back(address.control());
#endif
const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
record_t* record = kpt->get_record();
pending_context.Touch(record);
address =
TraceBackForMatch(pending_context, kpt->prev_address, head_address);
}
if(address < start_addr || address < hlog.begin_address.load()) {
// Hitting the end of hash chain, scan is complete.
pending_context.Finalize();
return OperationStatus::SUCCESS;
} else {
#ifdef _NULL_DISK
pending_context.Finalize();
return OperationStatus::SUCCESS;
#else
// Kick async to explore the disk part of the hash chain.
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
start_addr, end_addr);
return OperationStatus::SCAN_IN_PROGRESS;
#endif
}
}