in src/core/fishstore.h [2145:2205]
void FishStore<D, A>::AsyncFullScanFromDiskCallback(IAsyncContext* ctxt,
Status result,
size_t bytes_transferred) {
CallbackContext<AsyncIOContext> context{ctxt};
fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
auto pending_context =
static_cast<async_pending_full_scan_context_t*>(context->caller_context);
--fishstore->num_pending_ios;
context.async = true;
pending_context->result = result;
if(result == Status::Ok) {
// Always have the full page.
assert(context->record.available_bytes >= Address::kMaxOffset + 1);
uintptr_t from_address, to_address;
if(context->address.page() == pending_context->start_addr.page()) {
from_address =
reinterpret_cast<uintptr_t>(context->record.GetValidPointer() +
pending_context->start_addr.offset());
} else
from_address =
reinterpret_cast<uintptr_t>(context->record.GetValidPointer());
if(context->address.page() == pending_context->end_addr.page()) {
to_address =
reinterpret_cast<uintptr_t>(context->record.GetValidPointer() +
pending_context->end_addr.offset());
} else
to_address = reinterpret_cast<uintptr_t>(
context->record.GetValidPointer() + +Address::kMaxOffset);
uintptr_t address;
// Iterate through all records in the page.
for(address = from_address; address <= to_address;) {
record_t* record = reinterpret_cast<record_t*>(address);
if(record->header.IsNull()) {
address += sizeof(RecordInfo);
continue;
}
if(!record->header.invalid &&
pending_context->check(record->payload(), record->payload_size())) {
// Got a hit.
pending_context->Touch(record);
}
address += record->size();
}
if(context->address.page() == fishstore->hlog.begin_address.load().page() ||
context->address.page() == pending_context->start_addr.page()) {
// Hitting the begin page, finish full scan.
context->thread_io_responses->push(context.get());
} else {
// Issue an IO to retrieve the previous page.
context->address = Address{context->address.page() - 1, 0};
fishstore->AsyncGetFromDisk(context->address, Address::kMaxOffset + 1,
AsyncFullScanFromDiskCallback, *context.get());
context.async = true;
}
}
}