in cc/src/core/faster.h [840:974]
inline OperationStatus FasterKv<K, V, D>::InternalUpsert(C& pending_context) {
typedef C pending_upsert_context_t;
if(thread_ctx().phase != Phase::REST) {
HeavyEnter();
}
KeyHash hash = pending_context.get_key_hash();
HashBucketEntry expected_entry;
AtomicHashBucketEntry* atomic_entry = FindOrCreateEntry(hash, expected_entry);
// (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
Address address = expected_entry.address();
Address head_address = hlog.head_address.load();
Address read_only_address = hlog.read_only_address.load();
uint64_t latest_record_version = 0;
if(address >= head_address) {
// Multiple keys may share the same hash. Try to find the most recent record with a matching
// key that we might be able to update in place.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
latest_record_version = record->header.checkpoint_version;
if(!pending_context.is_key_equal(record->key())) {
address = TraceBackForKeyMatchCtxt(pending_context, record->header.previous_address(), head_address);
}
}
CheckpointLockGuard lock_guard{ checkpoint_locks_, hash };
// The common case
if(thread_ctx().phase == Phase::REST && address >= read_only_address) {
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
if(!record->header.tombstone && pending_context.PutAtomic(record)) {
return OperationStatus::SUCCESS;
} else {
// Must retry as RCU.
goto create_record;
}
}
// Acquire necessary locks.
switch(thread_ctx().phase) {
case Phase::PREPARE:
// Working on old version (v).
if(!lock_guard.try_lock_old()) {
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address, expected_entry);
return OperationStatus::CPR_SHIFT_DETECTED;
} else {
if(latest_record_version > thread_ctx().version) {
// CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
// what we've seen.
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
expected_entry);
return OperationStatus::CPR_SHIFT_DETECTED;
}
}
break;
case Phase::IN_PROGRESS:
// All other threads are in phase {PREPARE,IN_PROGRESS,WAIT_PENDING}.
if(latest_record_version < thread_ctx().version) {
// Will create new record or update existing record to new version (v+1).
if(!lock_guard.try_lock_new()) {
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
expected_entry);
return OperationStatus::RETRY_LATER;
} else {
// Update to new version (v+1) requires RCU.
goto create_record;
}
}
break;
case Phase::WAIT_PENDING:
// All other threads are in phase {IN_PROGRESS,WAIT_PENDING,WAIT_FLUSH}.
if(latest_record_version < thread_ctx().version) {
if(lock_guard.old_locked()) {
pending_context.go_async(thread_ctx().phase, thread_ctx().version, address,
expected_entry);
return OperationStatus::RETRY_LATER;
} else {
// Update to new version (v+1) requires RCU.
goto create_record;
}
}
break;
case Phase::WAIT_FLUSH:
// All other threads are in phase {WAIT_PENDING,WAIT_FLUSH,PERSISTENCE_CALLBACK}.
if(latest_record_version < thread_ctx().version) {
goto create_record;
}
break;
default:
break;
}
if(address >= read_only_address) {
// Mutable region; try to update in place.
if(atomic_entry->load() != expected_entry) {
// Some other thread may have RCUed the record before we locked it; try again.
return OperationStatus::RETRY_NOW;
}
// We acquired the necessary locks, so we can update the record's bucket atomically.
record_t* record = reinterpret_cast<record_t*>(hlog.Get(address));
if(!record->header.tombstone && pending_context.PutAtomic(record)) {
// Host successfully replaced record, atomically.
return OperationStatus::SUCCESS;
} else {
// Must retry as RCU.
goto create_record;
}
}
// Create a record and attempt RCU.
create_record:
uint32_t record_size = record_t::size(pending_context.key_size(), pending_context.value_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{
RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, false, false,
expected_entry.address() }
};
pending_context.write_deep_key_at(const_cast<key_t*>(&record->key()));
pending_context.Put(record);
HashBucketEntry updated_entry{ new_address, hash.tag(), false };
if(atomic_entry->compare_exchange_strong(expected_entry, updated_entry)) {
// Installed the new record in the hash table.
return OperationStatus::SUCCESS;
} else {
// Try again.
record->header.invalid = true;
return InternalUpsert(pending_context);
}
}