inline OperationStatus FishStore::InternalInsert()

in src/core/fishstore.h [1210:1376]


inline OperationStatus FishStore<D, A>::InternalInsert(C& pending_context) {
  typedef C pending_insert_context_t;

  if(thread_ctx().phase != Phase::REST) {
    HeavyEnter();
  }

  const std::vector<KPTUtil>& kpts = pending_context.kpts();
  uint16_t n_general_pts = pending_context.n_general_pts();

  // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
  Address head_address = hlog.head_address.load();
  uint64_t latest_record_version = 0;
  std::vector<HashBucketCASHelper> cas_helpers(kpts.size());
  for(uint16_t i = 0; i < kpts.size(); ++i) {
    HashBucket* bucket;
    cas_helpers[i].atomic_entry = FindOrCreateEntry(kpts[i].hash, cas_helpers[i].expected_entry,
                                  bucket);
    Address address = cas_helpers[i].expected_entry.address();
    if(address >= head_address) {
      record_t* record =
        reinterpret_cast<KeyPointer*>(hlog.Get(address))->get_record();
      latest_record_version =
        std::max(latest_record_version, record->header.checkpoint_version);
    }
  }

  // Threre will be no actual checkpoint locking happening since the only writer to a record is
  // the its ingestion thread and no other readers is able to read it until the record is successfully
  // inserted. The only thing we need to take care of is to pivot and retry any insertions when a newer
  // record among all the hash chains appears.
  if(thread_ctx().phase == Phase::PREPARE && latest_record_version > thread_ctx().version) {
    // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
    // what we've seen.
    pending_context.go_async(thread_ctx().phase, thread_ctx().version, Address::kInvalidAddress);
    return OperationStatus::CPR_SHIFT_DETECTED;
  }

  // Calculate size of the record and allocate its slot in the log. Note that
  // invalid bit is set to true at the very beginning. Thus, the record is not
  // yet visible.
  uint32_t record_size = record_t::size(static_cast<uint32_t>(kpts.size()),
                                        pending_context.payload_size(), pending_context.optional_size());
  Address new_address = BlockAllocate(record_size);
  record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
  new(record) record_t{RecordInfo{
      static_cast<uint16_t>(thread_ctx().version), true, false, true,
      static_cast<uint32_t>(kpts.size()), pending_context.payload_size(), pending_context.optional_size()}};

#ifdef _TIMER
  auto t_memcpy_start = std::chrono::high_resolution_clock::now();
#endif

  // Copy in the payload.
  memcpy(record->payload(), pending_context.payload(), pending_context.payload_size());
  if(pending_context.optional_size() > 0) {
    char* ptr = record->payload() + pending_context.payload_size();
    for(const auto& option : pending_context.options()) {
      memcpy(ptr, option.payload, option.size);
      ptr += option.size;
      if (option.need_free) delete option.payload;
    }
  }

#ifdef _TIMER
  auto t_memcpy_end = std::chrono::high_resolution_clock::now();
  tot_memcpy_time += std::chrono::duration<double>(t_memcpy_end - t_memcpy_start).count();
#endif

  // Construct key pointers and swap them into hash chains. `kpt_address`
  // tracks the address for the key pointer we are currently dealing with.
  uint16_t i = 0;
  Address kpt_address = new_address;
  kpt_address += sizeof(RecordInfo);
  // Swap in all field-based key pointers.
  for(i = 0; i < n_general_pts; ++i) {
    // Initialzie the key pointer with its KPTUtil.
    KeyPointer* now = record->get_ptr(i);
    HashBucketEntry expected_entry = cas_helpers[i].expected_entry;
    new(now)
    KeyPointer(i, expected_entry.address().control(), kpts[i].general_psf_id,
               kpts[i].value_offset, kpts[i].value_size);

    // CAS Trick to swap in key pointer into the hash chain without storage
    // amplification. Invariant: pointers on the hash chain should always
    // point backwards, i.e., from high address to low address.
    AtomicHashBucketEntry* atomic_entry = cas_helpers[i].atomic_entry;
    HashBucketEntry updated_entry{kpt_address, kpts[i].hash.tag(), false};

    // Try to swap the hash bucket first.
    bool success;
    while(!(success = atomic_entry->compare_exchange_strong(
                        expected_entry, updated_entry))) {
      // Failure handle:
      // - If the hash bucket now points to an address lower than the
      // candidate, try again.
      // - Otherwise, we will never win the hash bucket since it will break
      // the invariant.
      if(expected_entry.address() < kpt_address) {
        now->prev_address = expected_entry.address().control();
      } else
        break;
    }

    if(!success) {
      // We found that we cannot win the hash bucket, try to win somewhere
      // else.
      KeyPointer* expected_kpt =
        reinterpret_cast<KeyPointer*>(hlog.Get(expected_entry.address()));
      KPTCASUtil expected_kpt_cas{expected_kpt->control.load()};
      do {
        // Find the first key pointer whose address is higher the candidate
        // while its previous address points to somewhere lower than the
        // candiate.
        while(expected_kpt_cas.prev_address > kpt_address.control()) {
          expected_kpt = reinterpret_cast<KeyPointer*>(
                           hlog.Get(expected_kpt_cas.prev_address));
          expected_kpt_cas.control = expected_kpt->control.load();
        }
        // Try to insert the key pointer right before the key pointer we just
        // found. If it fails again, recursively do this until succuess.
        now->prev_address = expected_kpt_cas.prev_address;
      } while(!expected_kpt->alter(kpt_address, expected_kpt_cas));
    }

    kpt_address += sizeof(KeyPointer);
  }
  // Swap in all predicate-based key pointers.
  for(; i < kpts.size(); ++i) {
    // Initialzie the key pointer with its KPTUtil.
    KeyPointer* now = record->get_ptr(i);
    HashBucketEntry expected_entry = cas_helpers[i].expected_entry;
    new(now) KeyPointer(i, expected_entry.address().control(),
                        kpts[i].inline_psf_id, kpts[i].value);

    // Same compare and swap trick as above.
    AtomicHashBucketEntry* atomic_entry = cas_helpers[i].atomic_entry;
    HashBucketEntry updated_entry{kpt_address, kpts[i].hash.tag(), false};
    bool success;
    while(!(success = atomic_entry->compare_exchange_strong(
                        expected_entry, updated_entry))) {
      if(expected_entry.address() < kpt_address) {
        now->prev_address = expected_entry.address().control();
      } else
        break;
    }

    if(!success) {
      KeyPointer* expected_kpt =
        reinterpret_cast<KeyPointer*>(hlog.Get(expected_entry.address()));
      KPTCASUtil expected_kpt_cas{expected_kpt->control.load()};
      do {
        while(expected_kpt_cas.prev_address > kpt_address.control()) {
          expected_kpt = reinterpret_cast<KeyPointer*>(
                           hlog.Get(expected_kpt_cas.prev_address));
          expected_kpt_cas.control = expected_kpt->control.load();
        }
        now->prev_address = expected_kpt_cas.prev_address;
      } while(!expected_kpt->alter(kpt_address, expected_kpt_cas));
    }

    kpt_address += sizeof(KeyPointer);
  }

  record->header.invalid = false;
  return OperationStatus::SUCCESS;
}