in src/core/fishstore.h [1847:2142]
void FishStore<D, A>::AsyncScanFromDiskCallback(IAsyncContext* ctxt, Status result,
size_t bytes_transferred) {
CallbackContext<AsyncIOContext> context{ctxt};
fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
auto pending_context =
static_cast<async_pending_scan_context_t*>(context->caller_context);
// Get the current adaptive prefetch level.
auto current_scan_offset_bit = pending_context->scan_offset_bit();
--fishstore->num_pending_ios;
context.async = true;
pending_context->result = result;
if(result == Status::Ok) {
// Common Utilities:
// Term scan page indicates a page of the current IO level size.
// `io_base` gives the base of IO, i.e., the base pointer to the requested
// address. Note that request address is always the head of scan page. (when
// IO level is 0, scan page head is exactly the original requested address)
// `read_offset` gives the offset from `io_base` to the actual requested
// address for a key pointer or record. Thus, the actual data we want starts
// at `io_base + read_offset`. `scan_page` provides the current scan page
// number, which can be used as an indicator to check if next record on the
// chain is available in the current IO block. `available_bytes` gives the
// available bytes starting from the actual requested address. Note that it
// is not the available bytes starting from `io_base`. `valid_offset`
// provides the number of bytes that is available before `io_base`.
uint8_t* io_base = context->record.GetValidPointer();
uint32_t read_offset =
context->address.scan_offset(current_scan_offset_bit);
uint64_t scan_page = context->address.scan_page(current_scan_offset_bit);
uint32_t available_bytes = context->record.available_bytes - read_offset;
uint32_t valid_offset = context->record.valid_offset;
// If we still have not reach end_addr;
if(context->address > pending_context->end_addr) {
KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset);
Address current_address = kpt->prev_address;
uint64_t gap = (context->address - current_address).control();
KeyPointer* kpt_now;
// Explore the hash chain that has already retrieved. Stop either
// hitting the end of the chain or hitting something outside the
// current IO block.
while(current_address >= pending_context->start_addr &&
current_address >= fishstore->hlog.begin_address.load() &&
current_address.scan_page(current_scan_offset_bit) == scan_page) {
kpt_now = reinterpret_cast<KeyPointer*>(
io_base + current_address.scan_offset(current_scan_offset_bit));
if(current_address <= pending_context->end_addr) {
// If we got the key pointer but lose the header, break to issue
// another IO block.
if(current_address.scan_offset(current_scan_offset_bit) <
record_t::ptr_offset(kpt_now->ptr_offset))
break;
record_t* record = kpt_now->get_record();
if(!record->header.invalid && pending_context->check(kpt_now)) {
// Got a hit on the chain.
pending_context->Touch(record);
}
}
gap = (current_address - kpt_now->prev_address).control();
current_address = kpt_now->prev_address;
}
if(current_address < fishstore->hlog.begin_address.load() ||
current_address < pending_context->start_addr) {
// Hitting the end of chain, end of IOs.
context->address = current_address;
context->thread_io_responses->push(context.get());
} else if(current_address.scan_page(current_scan_offset_bit) ==
scan_page) {
// Key pointer is in this IO block but lose the header.
// Issuing another IO to get the header, meanwhile get the whole
// scan page and hope for locality.
context->address =
current_address - record_t::ptr_offset(kpt_now->ptr_offset);
context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
// IO size is the minimum to get the header while maintain the same
// IO level.
fishstore->AsyncGetFromDisk(
context->address.scan_page_head(current_scan_offset_bit),
std::max(context->kpt_offset + fishstore->MinIoRequestSize(),
Address::scan_page_size(current_scan_offset_bit) +
current_address.scan_offset(current_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
} else {
// Determine the new IO level based on the gap observed.
pending_context->setIOLevel(gap);
auto new_scan_offset_bit = pending_context->scan_offset_bit();
// Issue the next IO with new scan page size, gurantees to get the
// full key pointer at least.
context->address = current_address;
context->kpt_offset = 0;
fishstore->AsyncGetFromDisk(
current_address.scan_page_head(new_scan_offset_bit),
std::max(Address::scan_page_size(new_scan_offset_bit),
current_address.scan_offset(new_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
}
} else if(context->kpt_offset == 0) {
// The returned IO region starts at a KeyPointer.
KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset);
uint32_t ptr_offset = record_t::ptr_offset(kpt->ptr_offset);
// Move context_address to the head of the record. Calculate the new scan
// page head against the head of the record.
context->address = context->address - ptr_offset;
#ifdef _SCAN_BENCH
visited_address.emplace_back(context->address.control());
#endif
Address scan_page_head =
context->address.scan_page_head(current_scan_offset_bit);
if(valid_offset + read_offset < ptr_offset) {
// We lost the header, do the IO again so that I can get the header.
// IO size at least includes the header up to the current key pointer.
// Keep the IO level of next one same as the current IO.
context->kpt_offset = ptr_offset;
fishstore->AsyncGetFromDisk(
scan_page_head,
std::max(ptr_offset + fishstore->MinIoRequestSize(),
Address::scan_page_size(current_scan_offset_bit) +
read_offset + fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
context.async = true;
} else {
// Now we have the header, we can infer the record size now.
record_t* record = kpt->get_record();
uint32_t available_bytes_for_record = ptr_offset + available_bytes;
if(record->size() > available_bytes_for_record) {
// We don't have the full record yet. Issue another IO for the full
// record.
context->kpt_offset = ptr_offset;
fishstore->AsyncGetFromDisk(scan_page_head,
read_offset - ptr_offset + record->size(),
AsyncScanFromDiskCallback, *context.get());
context.async = true;
} else {
// Check the first record see if it is valid.
if(!record->header.invalid && pending_context->check(kpt)) {
pending_context->Touch(record);
}
// Keep track of the gap between the current record and the previous
// one in the chain.
Address current_address = kpt->prev_address;
uint64_t gap = (context->address - current_address).control();
KeyPointer* kpt_now;
// Explore the hash chain that has already retrieved. Stop either
// hitting the end of the chain or hitting something outside the
// current IO block.
while(current_address >= pending_context->start_addr &&
current_address >= fishstore->hlog.begin_address.load() &&
current_address.scan_page(current_scan_offset_bit) ==
scan_page) {
kpt_now = reinterpret_cast<KeyPointer*>(
io_base + current_address.scan_offset(current_scan_offset_bit));
// If we got the key pointer but lose the header, break to issue
// another IO block.
if(current_address.scan_offset(current_scan_offset_bit) <
record_t::ptr_offset(kpt_now->ptr_offset))
break;
record_t* record = kpt_now->get_record();
if(!record->header.invalid && pending_context->check(kpt_now)) {
// Got a hit on the chain.
pending_context->Touch(record);
}
// calculating the gap.
gap = (current_address - kpt_now->prev_address).control();
current_address = kpt_now->prev_address;
}
if(current_address < pending_context->start_addr ||
current_address < fishstore->hlog.begin_address.load()) {
// Hitting the end of chain, end of IOs.
context->address = current_address;
context->thread_io_responses->push(context.get());
} else if(current_address.scan_page(current_scan_offset_bit) ==
scan_page) {
// Key pointer is in this IO block but lose the header.
// Issuing another IO to get the header, meanwhile get the whole
// scan page and hope for locality.
context->address =
current_address - record_t::ptr_offset(kpt_now->ptr_offset);
context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
// IO size is the minimum to get the header while maintain the same
// IO level.
fishstore->AsyncGetFromDisk(
context->address.scan_page_head(current_scan_offset_bit),
std::max(
context->kpt_offset + fishstore->MinIoRequestSize(),
Address::scan_page_size(current_scan_offset_bit) +
current_address.scan_offset(current_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
} else {
// Determine the new IO level based on the gap observed.
pending_context->setIOLevel(gap);
auto new_scan_offset_bit = pending_context->scan_offset_bit();
// Issue the next IO with new scan page size, gurantees to get the
// full key pointer at least.
context->address = current_address;
context->kpt_offset = 0;
fishstore->AsyncGetFromDisk(
current_address.scan_page_head(new_scan_offset_bit),
std::max(Address::scan_page_size(new_scan_offset_bit),
current_address.scan_offset(new_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
}
}
}
} else {
// The retured IO region starts at a record header.
record_t* record = reinterpret_cast<record_t*>(io_base + read_offset);
assert(available_bytes >= context->kpt_offset + sizeof(KeyPointer));
if(record->size() > available_bytes) {
// We don't have the full record yet.
fishstore->AsyncGetFromDisk(
context->address.scan_page_head(current_scan_offset_bit),
read_offset + record->size(), AsyncScanFromDiskCallback,
*context.get());
context.async = true;
} else {
// Similar IO path as above.
KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset +
context->kpt_offset);
if(!record->header.invalid && pending_context->check(kpt)) {
pending_context->Touch(record);
}
Address current_address = kpt->prev_address;
uint64_t gap = (context->address - current_address).control();
KeyPointer* kpt_now = nullptr;
while(current_address >= pending_context->start_addr &&
current_address >= fishstore->hlog.begin_address.load() &&
current_address.scan_page(current_scan_offset_bit) == scan_page) {
kpt_now = reinterpret_cast<KeyPointer*>(io_base + current_address.scan_offset(current_scan_offset_bit));
if(current_address.scan_offset(current_scan_offset_bit) < record_t::ptr_offset(kpt_now->ptr_offset))
break;
record_t* record = kpt_now->get_record();
if(!record->header.invalid && pending_context->check(kpt_now)) {
pending_context->Touch(record);
}
gap = (current_address - kpt_now->prev_address).control();
current_address = kpt_now->prev_address;
}
if(current_address < pending_context->start_addr ||
current_address < fishstore->hlog.begin_address.load()) {
context->address = current_address;
context->thread_io_responses->push(context.get());
} else if(current_address.scan_page(current_scan_offset_bit) == scan_page) {
context->address =
current_address - record_t::ptr_offset(kpt_now->ptr_offset);
context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
fishstore->AsyncGetFromDisk(
context->address.scan_page_head(current_scan_offset_bit),
std::max(
context->kpt_offset + fishstore->MinIoRequestSize(),
Address::scan_page_size(current_scan_offset_bit) +
current_address.scan_offset(current_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
} else {
pending_context->setIOLevel(gap);
auto new_scan_offset_bit = pending_context->scan_offset_bit();
context->address = current_address;
context->kpt_offset = 0;
fishstore->AsyncGetFromDisk(
current_address.scan_page_head(new_scan_offset_bit),
std::max(Address::scan_page_size(new_scan_offset_bit),
current_address.scan_offset(new_scan_offset_bit) +
fishstore->MinIoRequestSize()),
AsyncScanFromDiskCallback, *context.get());
}
}
}
}
}