inline OperationStatus FasterKv::InternalDelete()

in cc/src/core/faster.h [1201:1319]


inline OperationStatus FasterKv<K, V, D>::InternalDelete(C& pending_context) {
  typedef C pending_delete_context_t;

  if(thread_ctx().phase != Phase::REST) {
    HeavyEnter();
  }

  KeyHash hash = pending_context.get_key_hash();
  HashBucketEntry expected_entry;
  AtomicHashBucketEntry* atomic_entry = const_cast<AtomicHashBucketEntry*>(FindEntry(hash, expected_entry));
  if(!atomic_entry) {
    // no record found
    return OperationStatus::NOT_FOUND;
  }

  Address address = expected_entry.address();
  Address head_address = hlog.head_address.load();
  Address read_only_address = hlog.read_only_address.load();
  Address begin_address = hlog.begin_address.load();
  uint64_t latest_record_version = 0;

  if(address >= head_address) {
    const record_t* record = reinterpret_cast<const record_t*>(hlog.Get(address));
    latest_record_version = record->header.checkpoint_version;
    if(!pending_context.is_key_equal(record->key())) {
      address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
    }
  }

  CheckpointLockGuard lock_guard{ checkpoint_locks_, hash };

  // NO optimization for most common case

  // Acquire necessary locks.
  switch (thread_ctx().phase) {
  case Phase::PREPARE:
    // Working on old version (v).
    if(!lock_guard.try_lock_old()) {
      pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
      return OperationStatus::CPR_SHIFT_DETECTED;
    } else if(latest_record_version > thread_ctx().version) {
      // CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
      // what we've seen.
      pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
      return OperationStatus::CPR_SHIFT_DETECTED;
    }
    break;
  case Phase::IN_PROGRESS:
    // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}.
    if(latest_record_version < thread_ctx().version) {
      // Will create new record or update existing record to new version (v+1).
      if(!lock_guard.try_lock_new()) {
        pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
        return OperationStatus::RETRY_LATER;
      } else {
        // Update to new version (v+1) requires RCU.
        goto create_record;
      }
    }
    break;
  case Phase::WAIT_PENDING:
    // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}.
    if(latest_record_version < thread_ctx().version) {
      if(lock_guard.old_locked()) {
        pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
        return OperationStatus::RETRY_LATER;
      } else {
        // Update to new version (v+1) requires RCU.
        goto create_record;
      }
    }
    break;
  case Phase::WAIT_FLUSH:
    // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}.
    if(latest_record_version < thread_ctx().version) {
      goto create_record;
    }
    break;
  default:
    break;
  }

  // Mutable Region: Update the record in-place
  if(address >= read_only_address) {
    record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
    // If the record is the head of the hash chain, try to update the hash chain and completely
    // elide record only if the previous address points to invalid address
    if(expected_entry.address() == address) {
      Address previous_address = record->header.previous_address();
      if (previous_address < begin_address) {
        atomic_entry->compare_exchange_strong(expected_entry, HashBucketEntry::kInvalidEntry);
      }
    }
    record->header.tombstone = true;
    return OperationStatus::SUCCESS;
  }

create_record:
  uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
  Address new_address = BlockAllocate(record_size);
  record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
  new(record) record_t{
    RecordInfo{
      static_cast<uint16_t>(thread_ctx().version), true, true, false,
      expected_entry.address() },
  };
  pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));

  HashBucketEntry updated_entry{ new_address, hash.tag(), false };

  if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
    // Installed the new record in the hash table.
    return OperationStatus::SUCCESS;
  } else {
    // Try again.
    record->header.invalid = true;
    return OperationStatus::RETRY_NOW;
  }
}