void FishStore::AsyncGetFromDiskCallback()

in src/core/fishstore.h [1743:1843]


void FishStore<D, A>::AsyncGetFromDiskCallback(IAsyncContext* ctxt, Status result,
    size_t bytes_transferred) {
  CallbackContext<AsyncIOContext> context{ctxt};
  fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
  auto pending_context =
    static_cast<async_pending_read_context_t*>(context->caller_context);

  --fishstore->num_pending_ios;

  context.async = true;

  // Some important numbers we need to refer in IO callback.
  // `context->record` is the sector aligned memory fetched by an IO.
  // `context->record.valid_offset` is the offset to requested IO address,
  // i.e., how many bytes before the requested address we can use.
  // `context->available_bytes` is the number of bytes that is available
  // starting from requested address. `context->kpt_offset` indicates how many
  // bytes between requested address to requested key pointer.
  pending_context->result = result;
  if(result == Status::Ok) {
    if(context->kpt_offset == 0) {
      // The returned IO region starts at a KeyPointer.
      KeyPointer* kpt =
        reinterpret_cast<KeyPointer*>(context->record.GetValidPointer());
      uint32_t ptr_offset = record_t::ptr_offset(kpt->ptr_offset);

      // Move context_address to the head of the record.
      context->address = context->address - ptr_offset;
      if(context->record.valid_offset < ptr_offset) {
        // We lost the header, do the IO again so that I can get the header.
        context->kpt_offset = ptr_offset;
        fishstore->AsyncGetFromDisk(context->address,
                                    fishstore->MinIoRequestSize() + ptr_offset,
                                    AsyncGetFromDiskCallback, *context.get());
        context.async = true;
      } else {
        // Now we have the header, we can infer the record size now.
        record_t* record = kpt->get_record();
        uint32_t available_bytes_for_record =
          ptr_offset + context->record.available_bytes;
        if(record->size() > available_bytes_for_record) {
          // We don't have the full record yet.
          context->kpt_offset = ptr_offset;
          fishstore->AsyncGetFromDisk(context->address, record->size(),
                                      AsyncGetFromDiskCallback, *context.get());
          context.async = true;
        } else if(!record->header.invalid && pending_context->check(kpt)) {
          // Got a hit.
          const char* data = record->payload();
          context->thread_io_responses->push(context.get());
        } else {
          context->address = kpt->prev_address;
          if(context->address >= fishstore->hlog.begin_address.load()) {
            // Does not match, traverse the chain. Reset kpt_offset back to 0,
            // since the context address is now pointing to a KeyPointer agian.
            fishstore->AsyncGetFromDisk(context->address,
                                        fishstore->MinIoRequestSize(),
                                        AsyncGetFromDiskCallback, *context.get());
          } else {
            // Hitting Invalid region. Finish hash chain exploration.
            context->thread_io_responses->push(context.get());
          }
        }
      }

    } else {
      // The retured IO region starts at a record header.
      record_t* record =
        reinterpret_cast<record_t*>(context->record.GetValidPointer());
      uint32_t available_bytes = context->record.available_bytes;
      assert(available_bytes >= context->kpt_offset + sizeof(KeyPointer));
      if(record->size() > available_bytes) {
        // We don't have the full record yet.
        fishstore->AsyncGetFromDisk(context->address, record->size(),
                                    AsyncGetFromDiskCallback, *context.get());
        context.async = true;
      } else {
        // Find the requested key pointer.
        KeyPointer* kpt = reinterpret_cast<KeyPointer*>(
                            context->record.GetValidPointer() + context->kpt_offset);
        if(!record->header.invalid && pending_context->check(kpt)) {
          // Got a hig.
          context->thread_io_responses->push(context.get());
        } else {
          context->address = kpt->prev_address;
          if(context->address >= fishstore->hlog.begin_address.load()) {
            // Does not match, traverse the chain. Reset kpt_offset back to 0,
            // since the context address is now pointing to a KeyPointer agian.
            context->kpt_offset = 0;
            fishstore->AsyncGetFromDisk(context->address,
                                        fishstore->MinIoRequestSize(),
                                        AsyncGetFromDiskCallback, *context.get());
          } else {
            // Hitting Invalid region. Finish hash chain exploration.
            context->thread_io_responses->push(context.get());
          }
        }
      }
    }
  }
}