inline OperationStatus FishStore::InternalScan()

in src/core/fishstore.h [1380:1465]


inline OperationStatus FishStore<D, A>::InternalScan(C& pending_context,
    uint64_t start_addr, uint64_t end_addr) const {
  assert(start_addr < end_addr);
  typedef C pending_scan_context_t;

  KeyHash hash = pending_context.get_hash();

  if(thread_ctx().phase != Phase::REST) {
    const_cast<fishstore_t*>(this)->HeavyEnter();
  }

  const AtomicHashBucketEntry* atomic_entry = FindEntry(hash);
  if(!atomic_entry) {
    // Nothing found.
    // There is no NOT_FOUND status for Scan.
    // If nothing is found, still call finalize.
    pending_context.Finalize();
    return OperationStatus::SUCCESS;
  }

  HashBucketEntry entry = atomic_entry->load();
  Address address = entry.address();
  Address begin_address = hlog.begin_address.load();
  Address head_address = hlog.head_address.load();
  uint64_t latest_record_version = 0;

  const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
  while(address > end_addr && address >= head_address) {
    address = kpt->prev_address;
    kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
  }

  // We only concern about one chain right now.
  if(address >= head_address) {
    const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
    record_t* record = kpt->get_record();
    latest_record_version = record->header.checkpoint_version;
    if(record->header.invalid || !pending_context.check(kpt)) {
      address =
        TraceBackForMatch(pending_context, kpt->prev_address, head_address);
    }
  }

  switch(thread_ctx().phase) {
  case Phase::PREPARE:
    // Reading old version (v).
    if(latest_record_version > thread_ctx().version) {
      // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
      // what we've seen.
      pending_context.go_async(thread_ctx().phase, thread_ctx().version, address);
      return OperationStatus::CPR_SHIFT_DETECTED;
    }
    break;
  default:
    break;
  }

  // Explore the hash chain until hit head_address. Thus, all in-memory records
  // on the chain will be touched.
  while(address >= start_addr && address >= head_address) {
#ifdef _SCAN_BENCH
    visited_address.emplace_back(address.control());
#endif
    const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
    record_t* record = kpt->get_record();
    pending_context.Touch(record);
    address =
      TraceBackForMatch(pending_context, kpt->prev_address, head_address);
  }

  if(address < start_addr || address < hlog.begin_address.load()) {
    // Hitting the end of hash chain, scan is complete.
    pending_context.Finalize();
    return OperationStatus::SUCCESS;
  } else {
#ifdef _NULL_DISK
    pending_context.Finalize();
    return OperationStatus::SUCCESS;
#else
    // Kick async to explore the disk part of the hash chain.
    pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
                             start_addr, end_addr);
    return OperationStatus::SCAN_IN_PROGRESS;
#endif
  }
}