in src/core/fishstore.h [1210:1376]
inline OperationStatus FishStore<D, A>::InternalInsert(C& pending_context) {
typedef C pending_insert_context_t;
if(thread_ctx().phase != Phase::REST) {
HeavyEnter();
}
const std::vector<KPTUtil>& kpts = pending_context.kpts();
uint16_t n_general_pts = pending_context.n_general_pts();
// (Note that address will be Address::kInvalidAddress, if the atomic_entry was created.)
Address head_address = hlog.head_address.load();
uint64_t latest_record_version = 0;
std::vector<HashBucketCASHelper> cas_helpers(kpts.size());
for(uint16_t i = 0; i < kpts.size(); ++i) {
HashBucket* bucket;
cas_helpers[i].atomic_entry = FindOrCreateEntry(kpts[i].hash, cas_helpers[i].expected_entry,
bucket);
Address address = cas_helpers[i].expected_entry.address();
if(address >= head_address) {
record_t* record =
reinterpret_cast<KeyPointer*>(hlog.Get(address))->get_record();
latest_record_version =
std::max(latest_record_version, record->header.checkpoint_version);
}
}
// Threre will be no actual checkpoint locking happening since the only writer to a record is
// the its ingestion thread and no other readers is able to read it until the record is successfully
// inserted. The only thing we need to take care of is to pivot and retry any insertions when a newer
// record among all the hash chains appears.
if(thread_ctx().phase == Phase::PREPARE && latest_record_version > thread_ctx().version) {
// CPR shift detected: we are in the "PREPARE" phase, and a record has a version later than
// what we've seen.
pending_context.go_async(thread_ctx().phase, thread_ctx().version, Address::kInvalidAddress);
return OperationStatus::CPR_SHIFT_DETECTED;
}
// Calculate size of the record and allocate its slot in the log. Note that
// invalid bit is set to true at the very beginning. Thus, the record is not
// yet visible.
uint32_t record_size = record_t::size(static_cast<uint32_t>(kpts.size()),
pending_context.payload_size(), pending_context.optional_size());
Address new_address = BlockAllocate(record_size);
record_t* record = reinterpret_cast<record_t*>(hlog.Get(new_address));
new(record) record_t{RecordInfo{
static_cast<uint16_t>(thread_ctx().version), true, false, true,
static_cast<uint32_t>(kpts.size()), pending_context.payload_size(), pending_context.optional_size()}};
#ifdef _TIMER
auto t_memcpy_start = std::chrono::high_resolution_clock::now();
#endif
// Copy in the payload.
memcpy(record->payload(), pending_context.payload(), pending_context.payload_size());
if(pending_context.optional_size() > 0) {
char* ptr = record->payload() + pending_context.payload_size();
for(const auto& option : pending_context.options()) {
memcpy(ptr, option.payload, option.size);
ptr += option.size;
if (option.need_free) delete option.payload;
}
}
#ifdef _TIMER
auto t_memcpy_end = std::chrono::high_resolution_clock::now();
tot_memcpy_time += std::chrono::duration<double>(t_memcpy_end - t_memcpy_start).count();
#endif
// Construct key pointers and swap them into hash chains. `kpt_address`
// tracks the address for the key pointer we are currently dealing with.
uint16_t i = 0;
Address kpt_address = new_address;
kpt_address += sizeof(RecordInfo);
// Swap in all field-based key pointers.
for(i = 0; i < n_general_pts; ++i) {
// Initialzie the key pointer with its KPTUtil.
KeyPointer* now = record->get_ptr(i);
HashBucketEntry expected_entry = cas_helpers[i].expected_entry;
new(now)
KeyPointer(i, expected_entry.address().control(), kpts[i].general_psf_id,
kpts[i].value_offset, kpts[i].value_size);
// CAS Trick to swap in key pointer into the hash chain without storage
// amplification. Invariant: pointers on the hash chain should always
// point backwards, i.e., from high address to low address.
AtomicHashBucketEntry* atomic_entry = cas_helpers[i].atomic_entry;
HashBucketEntry updated_entry{kpt_address, kpts[i].hash.tag(), false};
// Try to swap the hash bucket first.
bool success;
while(!(success = atomic_entry->compare_exchange_strong(
expected_entry, updated_entry))) {
// Failure handle:
// - If the hash bucket now points to an address lower than the
// candidate, try again.
// - Otherwise, we will never win the hash bucket since it will break
// the invariant.
if(expected_entry.address() < kpt_address) {
now->prev_address = expected_entry.address().control();
} else
break;
}
if(!success) {
// We found that we cannot win the hash bucket, try to win somewhere
// else.
KeyPointer* expected_kpt =
reinterpret_cast<KeyPointer*>(hlog.Get(expected_entry.address()));
KPTCASUtil expected_kpt_cas{expected_kpt->control.load()};
do {
// Find the first key pointer whose address is higher the candidate
// while its previous address points to somewhere lower than the
// candiate.
while(expected_kpt_cas.prev_address > kpt_address.control()) {
expected_kpt = reinterpret_cast<KeyPointer*>(
hlog.Get(expected_kpt_cas.prev_address));
expected_kpt_cas.control = expected_kpt->control.load();
}
// Try to insert the key pointer right before the key pointer we just
// found. If it fails again, recursively do this until succuess.
now->prev_address = expected_kpt_cas.prev_address;
} while(!expected_kpt->alter(kpt_address, expected_kpt_cas));
}
kpt_address += sizeof(KeyPointer);
}
// Swap in all predicate-based key pointers.
for(; i < kpts.size(); ++i) {
// Initialzie the key pointer with its KPTUtil.
KeyPointer* now = record->get_ptr(i);
HashBucketEntry expected_entry = cas_helpers[i].expected_entry;
new(now) KeyPointer(i, expected_entry.address().control(),
kpts[i].inline_psf_id, kpts[i].value);
// Same compare and swap trick as above.
AtomicHashBucketEntry* atomic_entry = cas_helpers[i].atomic_entry;
HashBucketEntry updated_entry{kpt_address, kpts[i].hash.tag(), false};
bool success;
while(!(success = atomic_entry->compare_exchange_strong(
expected_entry, updated_entry))) {
if(expected_entry.address() < kpt_address) {
now->prev_address = expected_entry.address().control();
} else
break;
}
if(!success) {
KeyPointer* expected_kpt =
reinterpret_cast<KeyPointer*>(hlog.Get(expected_entry.address()));
KPTCASUtil expected_kpt_cas{expected_kpt->control.load()};
do {
while(expected_kpt_cas.prev_address > kpt_address.control()) {
expected_kpt = reinterpret_cast<KeyPointer*>(
hlog.Get(expected_kpt_cas.prev_address));
expected_kpt_cas.control = expected_kpt->control.load();
}
now->prev_address = expected_kpt_cas.prev_address;
} while(!expected_kpt->alter(kpt_address, expected_kpt_cas));
}
kpt_address += sizeof(KeyPointer);
}
record->header.invalid = false;
return OperationStatus::SUCCESS;
}