void FishStore::AsyncScanFromDiskCallback()

in src/core/fishstore.h [1847:2142]


void FishStore<D, A>::AsyncScanFromDiskCallback(IAsyncContext* ctxt, Status result,
    size_t bytes_transferred) {
  CallbackContext<AsyncIOContext> context{ctxt};
  fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
  auto pending_context =
    static_cast<async_pending_scan_context_t*>(context->caller_context);
  // Get the current adaptive prefetch level.
  auto current_scan_offset_bit = pending_context->scan_offset_bit();

  --fishstore->num_pending_ios;

  context.async = true;

  pending_context->result = result;
  if(result == Status::Ok) {
    // Common Utilities:
    // Term scan page indicates a page of the current IO level size.
    // `io_base` gives the base of IO, i.e., the base pointer to the requested
    // address. Note that request address is always the head of scan page. (when
    // IO level is 0, scan page head is exactly the original requested address)
    // `read_offset` gives the offset from `io_base` to the actual requested
    // address for a key pointer or record. Thus, the actual data we want starts
    // at `io_base + read_offset`. `scan_page` provides the current scan page
    // number, which can be used as an indicator to check if next record on the
    // chain is available in the current IO block. `available_bytes` gives the
    // available bytes starting from the actual requested address. Note that it
    // is not the available bytes starting from `io_base`. `valid_offset`
    // provides the number of bytes that is available before `io_base`.
    uint8_t* io_base = context->record.GetValidPointer();
    uint32_t read_offset =
      context->address.scan_offset(current_scan_offset_bit);
    uint64_t scan_page = context->address.scan_page(current_scan_offset_bit);
    uint32_t available_bytes = context->record.available_bytes - read_offset;
    uint32_t valid_offset = context->record.valid_offset;

    // If we still have not reach end_addr;
    if(context->address > pending_context->end_addr) {
      KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset);
      Address current_address = kpt->prev_address;
      uint64_t gap = (context->address - current_address).control();
      KeyPointer* kpt_now;
      // Explore the hash chain that has already retrieved. Stop either
      // hitting the end of the chain or hitting something outside the
      // current IO block.
      while(current_address >= pending_context->start_addr &&
            current_address >= fishstore->hlog.begin_address.load() &&
            current_address.scan_page(current_scan_offset_bit) == scan_page) {
        kpt_now = reinterpret_cast<KeyPointer*>(
                    io_base + current_address.scan_offset(current_scan_offset_bit));

        if(current_address <= pending_context->end_addr) {
          // If we got the key pointer but lose the header, break to issue
          // another IO block.
          if(current_address.scan_offset(current_scan_offset_bit) <
              record_t::ptr_offset(kpt_now->ptr_offset))
            break;

          record_t* record = kpt_now->get_record();
          if(!record->header.invalid && pending_context->check(kpt_now)) {
            // Got a hit on the chain.
            pending_context->Touch(record);
          }
        }

        gap = (current_address - kpt_now->prev_address).control();
        current_address = kpt_now->prev_address;
      }

      if(current_address < fishstore->hlog.begin_address.load() ||
          current_address < pending_context->start_addr) {
        // Hitting the end of chain, end of IOs.
        context->address = current_address;
        context->thread_io_responses->push(context.get());
      } else if(current_address.scan_page(current_scan_offset_bit) ==
                scan_page) {
        // Key pointer is in this IO block but lose the header.
        // Issuing another IO to get the header, meanwhile get the whole
        // scan page and hope for locality.
        context->address =
          current_address - record_t::ptr_offset(kpt_now->ptr_offset);
        context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
        // IO size is the minimum to get the header while maintain the same
        // IO level.
        fishstore->AsyncGetFromDisk(
          context->address.scan_page_head(current_scan_offset_bit),
          std::max(context->kpt_offset + fishstore->MinIoRequestSize(),
                   Address::scan_page_size(current_scan_offset_bit) +
                   current_address.scan_offset(current_scan_offset_bit) +
                   fishstore->MinIoRequestSize()),
          AsyncScanFromDiskCallback, *context.get());
      } else {
        // Determine the new IO level based on the gap observed.
        pending_context->setIOLevel(gap);
        auto new_scan_offset_bit = pending_context->scan_offset_bit();

        // Issue the next IO with new scan page size, gurantees to get the
        // full key pointer at least.
        context->address = current_address;
        context->kpt_offset = 0;
        fishstore->AsyncGetFromDisk(
          current_address.scan_page_head(new_scan_offset_bit),
          std::max(Address::scan_page_size(new_scan_offset_bit),
                   current_address.scan_offset(new_scan_offset_bit) +
                   fishstore->MinIoRequestSize()),
          AsyncScanFromDiskCallback, *context.get());
      }

    } else if(context->kpt_offset == 0) {
      // The returned IO region starts at a KeyPointer.
      KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset);
      uint32_t ptr_offset = record_t::ptr_offset(kpt->ptr_offset);

      // Move context_address to the head of the record. Calculate the new scan
      // page head against the head of the record.
      context->address = context->address - ptr_offset;
#ifdef _SCAN_BENCH
      visited_address.emplace_back(context->address.control());
#endif
      Address scan_page_head =
        context->address.scan_page_head(current_scan_offset_bit);
      if(valid_offset + read_offset < ptr_offset) {
        // We lost the header, do the IO again so that I can get the header.
        // IO size at least includes the header up to the current key pointer.
        // Keep the IO level of next one same as the current IO.
        context->kpt_offset = ptr_offset;
        fishstore->AsyncGetFromDisk(
          scan_page_head,
          std::max(ptr_offset + fishstore->MinIoRequestSize(),
                   Address::scan_page_size(current_scan_offset_bit) +
                   read_offset + fishstore->MinIoRequestSize()),
          AsyncScanFromDiskCallback, *context.get());
        context.async = true;
      } else {
        // Now we have the header, we can infer the record size now.
        record_t* record = kpt->get_record();
        uint32_t available_bytes_for_record = ptr_offset + available_bytes;
        if(record->size() > available_bytes_for_record) {
          // We don't have the full record yet. Issue another IO for the full
          // record.
          context->kpt_offset = ptr_offset;
          fishstore->AsyncGetFromDisk(scan_page_head,
                                      read_offset - ptr_offset + record->size(),
                                      AsyncScanFromDiskCallback, *context.get());
          context.async = true;
        } else {
          // Check the first record see if it is valid.
          if(!record->header.invalid && pending_context->check(kpt)) {
            pending_context->Touch(record);
          }

          // Keep track of the gap between the current record and the previous
          // one in the chain.
          Address current_address = kpt->prev_address;
          uint64_t gap = (context->address - current_address).control();
          KeyPointer* kpt_now;
          // Explore the hash chain that has already retrieved. Stop either
          // hitting the end of the chain or hitting something outside the
          // current IO block.
          while(current_address >= pending_context->start_addr &&
                current_address >= fishstore->hlog.begin_address.load() &&
                current_address.scan_page(current_scan_offset_bit) ==
                scan_page) {
            kpt_now = reinterpret_cast<KeyPointer*>(
                        io_base + current_address.scan_offset(current_scan_offset_bit));

            // If we got the key pointer but lose the header, break to issue
            // another IO block.
            if(current_address.scan_offset(current_scan_offset_bit) <
                record_t::ptr_offset(kpt_now->ptr_offset))
              break;

            record_t* record = kpt_now->get_record();
            if(!record->header.invalid && pending_context->check(kpt_now)) {
              // Got a hit on the chain.
              pending_context->Touch(record);
            }

            // calculating the gap.
            gap = (current_address - kpt_now->prev_address).control();
            current_address = kpt_now->prev_address;
          }

          if(current_address < pending_context->start_addr ||
              current_address < fishstore->hlog.begin_address.load()) {
            // Hitting the end of chain, end of IOs.
            context->address = current_address;
            context->thread_io_responses->push(context.get());
          } else if(current_address.scan_page(current_scan_offset_bit) ==
                    scan_page) {
            // Key pointer is in this IO block but lose the header.
            // Issuing another IO to get the header, meanwhile get the whole
            // scan page and hope for locality.
            context->address =
              current_address - record_t::ptr_offset(kpt_now->ptr_offset);
            context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
            // IO size is the minimum to get the header while maintain the same
            // IO level.
            fishstore->AsyncGetFromDisk(
              context->address.scan_page_head(current_scan_offset_bit),
              std::max(
                context->kpt_offset + fishstore->MinIoRequestSize(),
                Address::scan_page_size(current_scan_offset_bit) +
                current_address.scan_offset(current_scan_offset_bit) +
                fishstore->MinIoRequestSize()),
              AsyncScanFromDiskCallback, *context.get());
          } else {
            // Determine the new IO level based on the gap observed.
            pending_context->setIOLevel(gap);
            auto new_scan_offset_bit = pending_context->scan_offset_bit();

            // Issue the next IO with new scan page size, gurantees to get the
            // full key pointer at least.
            context->address = current_address;
            context->kpt_offset = 0;
            fishstore->AsyncGetFromDisk(
              current_address.scan_page_head(new_scan_offset_bit),
              std::max(Address::scan_page_size(new_scan_offset_bit),
                       current_address.scan_offset(new_scan_offset_bit) +
                       fishstore->MinIoRequestSize()),
              AsyncScanFromDiskCallback, *context.get());
          }
        }
      }

    } else {
      // The retured IO region starts at a record header.
      record_t* record = reinterpret_cast<record_t*>(io_base + read_offset);
      assert(available_bytes >= context->kpt_offset + sizeof(KeyPointer));
      if(record->size() > available_bytes) {
        // We don't have the full record yet.
        fishstore->AsyncGetFromDisk(
          context->address.scan_page_head(current_scan_offset_bit),
          read_offset + record->size(), AsyncScanFromDiskCallback,
          *context.get());
        context.async = true;
      } else {
        // Similar IO path as above.
        KeyPointer* kpt = reinterpret_cast<KeyPointer*>(io_base + read_offset +
                          context->kpt_offset);

        if(!record->header.invalid && pending_context->check(kpt)) {
          pending_context->Touch(record);
        }

        Address current_address = kpt->prev_address;
        uint64_t gap = (context->address - current_address).control();
        KeyPointer* kpt_now = nullptr;
        while(current_address >= pending_context->start_addr &&
              current_address >= fishstore->hlog.begin_address.load() &&
              current_address.scan_page(current_scan_offset_bit) == scan_page) {

          kpt_now = reinterpret_cast<KeyPointer*>(io_base + current_address.scan_offset(current_scan_offset_bit));
          if(current_address.scan_offset(current_scan_offset_bit) < record_t::ptr_offset(kpt_now->ptr_offset))
            break;

          record_t* record = kpt_now->get_record();
          if(!record->header.invalid && pending_context->check(kpt_now)) {
            pending_context->Touch(record);
          }
          gap = (current_address - kpt_now->prev_address).control();
          current_address = kpt_now->prev_address;
        }

        if(current_address < pending_context->start_addr ||
            current_address < fishstore->hlog.begin_address.load()) {
          context->address = current_address;
          context->thread_io_responses->push(context.get());
        } else if(current_address.scan_page(current_scan_offset_bit) == scan_page) {
          context->address =
            current_address - record_t::ptr_offset(kpt_now->ptr_offset);
          context->kpt_offset = record_t::ptr_offset(kpt_now->ptr_offset);
          fishstore->AsyncGetFromDisk(
            context->address.scan_page_head(current_scan_offset_bit),
            std::max(
              context->kpt_offset + fishstore->MinIoRequestSize(),
              Address::scan_page_size(current_scan_offset_bit) +
              current_address.scan_offset(current_scan_offset_bit) +
              fishstore->MinIoRequestSize()),
            AsyncScanFromDiskCallback, *context.get());
        } else {
          pending_context->setIOLevel(gap);
          auto new_scan_offset_bit = pending_context->scan_offset_bit();

          context->address = current_address;
          context->kpt_offset = 0;
          fishstore->AsyncGetFromDisk(
            current_address.scan_page_head(new_scan_offset_bit),
            std::max(Address::scan_page_size(new_scan_offset_bit),
                     current_address.scan_offset(new_scan_offset_bit) +
                     fishstore->MinIoRequestSize()),
            AsyncScanFromDiskCallback, *context.get());
        }
      }
    }
  }
}