in src/core/fishstore.h [1743:1843]
void FishStore<D, A>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
size_t bytes_transferred) {
CallbackContext<AsyncIOContext> context{ctxt};
fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
auto pending_context =
static_cast<async_pending_read_context_t*>(context->caller_context);
--fishstore->num_pending_ios;
context.async = true;
// Some important numbers we need to refer in IO callback.
// `context->record` is the sector aligned memory fetched by an IO.
// `context->record.valid_offset` is the offset to requested IO address,
// i.e., how many bytes before the requested address we can use.
// `context->available_bytes` is the number of bytes that is available
// starting from requested address. `context->kpt_offset` indicates how many
// bytes between requested address to requested key pointer.
pending_context->result = result;
if(result == Status::Ok) {
if(context->kpt_offset == 0) {
// The returned IO region starts at a KeyPointer.
KeyPointer* kpt =
reinterpret_cast<KeyPointer*>(context->record.GetValidPointer());
uint32_t ptr_offset = record_t::ptr_offset(kpt->ptr_offset);
// Move context_address to the head of the record.
context->address = context->address - ptr_offset;
if(context->record.valid_offset < ptr_offset) {
// We lost the header, do the IO again so that I can get the header.
context->kpt_offset = ptr_offset;
fishstore->AsyncGetFromDisk(context->address,
fishstore->MinIoRequestSize() + ptr_offset,
AsyncGetFromDiskCallback, *context.get());
context.async = true;
} else {
// Now we have the header, we can infer the record size now.
record_t* record = kpt->get_record();
uint32_t available_bytes_for_record =
ptr_offset + context->record.available_bytes;
if(record->size() > available_bytes_for_record) {
// We don't have the full record yet.
context->kpt_offset = ptr_offset;
fishstore->AsyncGetFromDisk(context->address, record->size(),
AsyncGetFromDiskCallback, *context.get());
context.async = true;
} else if(!record->header.invalid && pending_context->check(kpt)) {
// Got a hit.
const char* data = record->payload();
context->thread_io_responses->push(context.get());
} else {
context->address = kpt->prev_address;
if(context->address >= fishstore->hlog.begin_address.load()) {
// Does not match, traverse the chain. Reset kpt_offset back to 0,
// since the context address is now pointing to a KeyPointer agian.
fishstore->AsyncGetFromDisk(context->address,
fishstore->MinIoRequestSize(),
AsyncGetFromDiskCallback, *context.get());
} else {
// Hitting Invalid region. Finish hash chain exploration.
context->thread_io_responses->push(context.get());
}
}
}
} else {
// The retured IO region starts at a record header.
record_t* record =
reinterpret_cast<record_t*>(context->record.GetValidPointer());
uint32_t available_bytes = context->record.available_bytes;
assert(available_bytes >= context->kpt_offset + sizeof(KeyPointer));
if(record->size() > available_bytes) {
// We don't have the full record yet.
fishstore->AsyncGetFromDisk(context->address, record->size(),
AsyncGetFromDiskCallback, *context.get());
context.async = true;
} else {
// Find the requested key pointer.
KeyPointer* kpt = reinterpret_cast<KeyPointer*>(
context->record.GetValidPointer() + context->kpt_offset);
if(!record->header.invalid && pending_context->check(kpt)) {
// Got a hig.
context->thread_io_responses->push(context.get());
} else {
context->address = kpt->prev_address;
if(context->address >= fishstore->hlog.begin_address.load()) {
// Does not match, traverse the chain. Reset kpt_offset back to 0,
// since the context address is now pointing to a KeyPointer agian.
context->kpt_offset = 0;
fishstore->AsyncGetFromDisk(context->address,
fishstore->MinIoRequestSize(),
AsyncGetFromDiskCallback, *context.get());
} else {
// Hitting Invalid region. Finish hash chain exploration.
context->thread_io_responses->push(context.get());
}
}
}
}
}
}