inline OperationStatus FasterKv::InternalRmw()

in cc/src/core/faster.h [978:1187]


inline OperationStatus FasterKv<K, V, D>::InternalRmw(C& pending_context, bool retrying) {
  typedef C pending_rmw_context_t;

  Phase phase = retrying ? pending_context.phase : thread_ctx().phase;
  uint32_t version = retrying ? pending_context.version : thread_ctx().version;

  if(phase != Phase::REST) {
    HeavyEnter();
  }

  KeyHash hash = pending_context.get_key_hash();
  HashBucketEntry expected_entry;
  AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);

  // (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
  Address address = expected_entry.address();
  Address begin_address = hlog.begin_address.load();
  Address head_address = hlog.head_address.load();
  Address read_only_address = hlog.read_only_address.load();
  Address safe_read_only_address = hlog.safe_read_only_address.load();
  uint64_t latest_record_version = 0;

  if(address >= head_address) {
    // Multiple keys may share the same hash. Try to find the most recent record with a matching
    // key that we might be able to update in place.
    record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
    latest_record_version = record->header.checkpoint_version;
    if(!pending_context.is_key_equal(record->key())) {
      address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
    }
  }

  CheckpointLockGuard lock_guard{ checkpoint_locks_, hash };

  // The common case.
  if(phase == Phase::REST && address >= read_only_address) {
    record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
    if(!record->header.tombstone && pending_context.RmwAtomic(record)) {
      // In-place RMW succeeded.
      return OperationStatus::SUCCESS;
    } else {
      // Must retry as RCU.
      goto create_record;
    }
  }

  // Acquire necessary locks.
  switch(phase) {
  case Phase::PREPARE:
    // Working on old version (v).
    if(!lock_guard.try_lock_old()) {
      // If we're retrying the operation, then we already have an old lock, so we'll always
      // succeed in obtaining a second. Otherwise, another thread has acquired the new lock, so
      // a CPR shift has occurred.
      assert(!retrying);
      pending_context.go_async(phase, version, address, expected_entry);
      return OperationStatus::CPR_SHIFT_DETECTED;
    } else {
      if(latest_record_version > version) {
        // CPR shift detected: we are in the "PREPARE" phase, and a mutable record has a version
        // later than what we've seen.
        assert(!retrying);
        pending_context.go_async(phase, version, address, expected_entry);
        return OperationStatus::CPR_SHIFT_DETECTED;
      }
    }
    break;
  case Phase::IN_PROGRESS:
    // All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}.
    if(latest_record_version < version) {
      // Will create new record or update existing record to new version (v+1).
      if(!lock_guard.try_lock_new()) {
        if(!retrying) {
          pending_context.go_async(phase, version, address, expected_entry);
        } else {
          pending_context.continue_async(address, expected_entry);
        }
        return OperationStatus::RETRY_LATER;
      } else {
        // Update to new version (v+1) requires RCU.
        goto create_record;
      }
    }
    break;
  case Phase::WAIT_PENDING:
    // All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}.
    if(latest_record_version < version) {
      if(lock_guard.old_locked()) {
        if(!retrying) {
          pending_context.go_async(phase, version, address, expected_entry);
        } else {
          pending_context.continue_async(address, expected_entry);
        }
        return OperationStatus::RETRY_LATER;
      } else {
        // Update to new version (v+1) requires RCU.
        goto create_record;
      }
    }
    break;
  case Phase::WAIT_FLUSH:
    // All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}.
    if(latest_record_version < version) {
      goto create_record;
    }
    break;
  default:
    break;
  }

  if(address >= read_only_address) {
    // Mutable region. Try to update in place.
    if(atomic_entry->load() != expected_entry) {
      // Some other thread may have RCUed the record before we locked it; try again.
      return OperationStatus::RETRY_NOW;
    }
    // We acquired the necessary locks, so so we can update the record's bucket atomically.
    record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
    if(!record->header.tombstone && pending_context.RmwAtomic(record)) {
      // In-place RMW succeeded.
      return OperationStatus::SUCCESS;
    } else {
      // Must retry as RCU.
      goto create_record;
    }
  } else if(address >= safe_read_only_address && !reinterpret_cast<record_t*>(hlog.Get(address))->header.tombstone) {
    // Fuzzy Region: Must go pending due to lost-update anomaly
    if(!retrying) {
      pending_context.go_async(phase, version, address, expected_entry);
    } else {
      pending_context.continue_async(address, expected_entry);
    }
    return OperationStatus::RETRY_LATER;
  } else if(address >= head_address) {
    goto create_record;
  } else if(address >= begin_address) {
    // Need to obtain old record from disk.
    if(!retrying) {
      pending_context.go_async(phase, version, address, expected_entry);
    } else {
      pending_context.continue_async(address, expected_entry);
    }
    return OperationStatus::RECORD_ON_DISK;
  } else {
    // Create a new record.
    goto create_record;
  }

  // Create a record and attempt RCU.
create_record:
  const record_t* old_record = nullptr;
  if(address >= head_address) {
    old_record = reinterpret_cast<const record_t*>(hlog.Get(address));
    if(old_record->header.tombstone) {
      old_record = nullptr;
    }
  }
  uint32_t record_size = old_record != nullptr ?
    record_t::size(pending_context.key_size(), pending_context.value_size(old_record)) :
    record_t::size(pending_context.key_size(), pending_context.value_size());

  Address new_address = BlockAllocate(record_size);
  record_t* new_record = reinterpret_cast<record_t*>(hlog.Get(new_address));

  // Allocating a block may have the side effect of advancing the head address.
  head_address = hlog.head_address.load();
  // Allocating a block may have the side effect of advancing the thread context's version and
  // phase.
  if(!retrying) {
    phase = thread_ctx().phase;
    version = thread_ctx().version;
  }

  new(new_record) record_t{
    RecordInfo{
      static_cast<uint16_t>(version), true, false, false,
      expected_entry.address() }
  };
  pending_context.write_deep_key_at(const_cast<key_t*>(&new_record->key()));

  if(old_record == nullptr || address < hlog.begin_address.load()) {
    pending_context.RmwInitial(new_record);
  } else if(address >= head_address) {
    pending_context.RmwCopy(old_record, new_record);
  } else {
    // The block we allocated for the new record caused the head address to advance beyond
    // the old record. Need to obtain the old record from disk.
    new_record->header.invalid = true;
    if(!retrying) {
      pending_context.go_async(phase, version, address, expected_entry);
    } else {
      pending_context.continue_async(address, expected_entry);
    }
    return OperationStatus::RECORD_ON_DISK;
  }

  HashBucketEntry updated_entry{ new_address, hash.tag(), false };
  if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
    return OperationStatus::SUCCESS;
  } else {
    // CAS failed; try again.
    new_record->header.invalid = true;
    if(!retrying) {
      pending_context.go_async(phase, version, address, expected_entry);
    } else {
      pending_context.continue_async(address, expected_entry);
    }
    return OperationStatus::RETRY_NOW;
  }
}