void FishStore::AsyncFullScanFromDiskCallback()

in src/core/fishstore.h [2145:2205]


void FishStore<D, A>::AsyncFullScanFromDiskCallback(IAsyncContext* ctxt,
    Status result,
    size_t bytes_transferred) {
  CallbackContext<AsyncIOContext> context{ctxt};
  fishstore_t* fishstore = reinterpret_cast<fishstore_t*>(context->fishstore);
  auto pending_context =
    static_cast<async_pending_full_scan_context_t*>(context->caller_context);

  --fishstore->num_pending_ios;
  context.async = true;

  pending_context->result = result;
  if(result == Status::Ok) {
    // Always have the full page.
    assert(context->record.available_bytes >= Address::kMaxOffset + 1);
    uintptr_t from_address, to_address;
    if(context->address.page() == pending_context->start_addr.page()) {
      from_address =
        reinterpret_cast<uintptr_t>(context->record.GetValidPointer() +
                                    pending_context->start_addr.offset());
    } else
      from_address =
        reinterpret_cast<uintptr_t>(context->record.GetValidPointer());

    if(context->address.page() == pending_context->end_addr.page()) {
      to_address =
        reinterpret_cast<uintptr_t>(context->record.GetValidPointer() +
                                    pending_context->end_addr.offset());
    } else
      to_address = reinterpret_cast<uintptr_t>(
                     context->record.GetValidPointer() + +Address::kMaxOffset);

    uintptr_t address;
    // Iterate through all records in the page.
    for(address = from_address; address <= to_address;) {
      record_t* record = reinterpret_cast<record_t*>(address);
      if(record->header.IsNull()) {
        address += sizeof(RecordInfo);
        continue;
      }
      if(!record->header.invalid &&
          pending_context->check(record->payload(), record->payload_size())) {
        // Got a hit.
        pending_context->Touch(record);
      }
      address += record->size();
    }

    if(context->address.page() == fishstore->hlog.begin_address.load().page() ||
        context->address.page() == pending_context->start_addr.page()) {
      // Hitting the begin page, finish full scan.
      context->thread_io_responses->push(context.get());
    } else {
      // Issue an IO to retrieve the previous page.
      context->address = Address{context->address.page() - 1, 0};
      fishstore->AsyncGetFromDisk(context->address, Address::kMaxOffset + 1,
                                  AsyncFullScanFromDiskCallback, *context.get());
      context.async = true;
    }
  }
}