inline OperationStatus FishStore::InternalRead()

in src/core/fishstore.h [1136:1201]


inline OperationStatus FishStore<D, A>::InternalRead(C& pending_context) const {
  typedef C pending_read_context_t;

  KeyHash hash = pending_context.get_hash();

  if(thread_ctx().phase != Phase::REST) {
    const_cast<fishstore_t*>(this)->HeavyEnter();
  }

  const AtomicHashBucketEntry* atomic_entry = FindEntry(hash);
  if(!atomic_entry) {
    // no record found
    return OperationStatus::NOT_FOUND;
  }

  HashBucketEntry entry = atomic_entry->load();
  Address address = entry.address();
  Address begin_address = hlog.begin_address.load();
  Address head_address = hlog.head_address.load();
  Address safe_read_only_address = hlog.safe_read_only_address.load();
  Address read_only_address = hlog.read_only_address.load();
  uint64_t latest_record_version = 0;

  if(address >= head_address) {
    // Look through the in-memory portion of the log, to find the first record (if any) whose key
    // matches.
    const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
    const record_t* record = kpt->get_record();
    latest_record_version = record->header.checkpoint_version;
    if(record->header.invalid || !pending_context.check(kpt)) {
      address = TraceBackForMatch(pending_context, kpt->prev_address, head_address);
    }
  }

  switch(thread_ctx().phase) {
  case Phase::PREPARE:
    // Reading old version (v).
    if(latest_record_version > thread_ctx().version) {
      // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
      // what we've seen.
      pending_context.go_async(thread_ctx().phase, thread_ctx().version, address);
      return OperationStatus::CPR_SHIFT_DETECTED;
    }
    break;
  default:
    break;
  }

  if(address >= head_address) {
    const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(address));
    const record_t* record = kpt->get_record();
    pending_context.Get(record);
    return OperationStatus::SUCCESS;
  } else if(address >= begin_address) {
#ifdef _NULL_DISK
    return OperationStatus::NOT_FOUND;
#else
    // Record not available in-memory
    pending_context.go_async(thread_ctx().phase, thread_ctx().version, address);
    return OperationStatus::RECORD_ON_DISK;
#endif
  } else {
    // No record found
    return OperationStatus::NOT_FOUND;
  }
}