in src/core/fishstore.h [2807:2904]
void FishStore<D, A>::SplitHashTableBuckets() {
// This thread won't exit until all hash table buckets have been split.
Address head_address = hlog.head_address.load();
Address begin_address = hlog.begin_address.load();
for(uint64_t chunk = grow_.next_chunk++; chunk < grow_.num_chunks; chunk = grow_.next_chunk++) {
uint64_t old_size = state_[grow_.old_version].size();
uint64_t new_size = state_[grow_.new_version].size();
assert(new_size == old_size * 2);
// Split this chunk.
uint64_t upper_bound;
if(chunk + 1 < grow_.num_chunks) {
// All chunks but the last chunk contain kGrowHashTableChunkSize elements.
upper_bound = kGrowHashTableChunkSize;
} else {
// Last chunk might contain more or fewer elements.
upper_bound = old_size - (chunk * kGrowHashTableChunkSize);
}
for(uint64_t idx = 0; idx < upper_bound; ++idx) {
// Split this (chain of) bucket(s).
HashBucket* old_bucket = &state_[grow_.old_version].bucket(
chunk * kGrowHashTableChunkSize + idx);
HashBucket* new_bucket0 = &state_[grow_.new_version].bucket(
chunk * kGrowHashTableChunkSize + idx);
HashBucket* new_bucket1 = &state_[grow_.new_version].bucket(
old_size + chunk * kGrowHashTableChunkSize + idx);
uint32_t new_entry_idx0 = 0;
uint32_t new_entry_idx1 = 0;
while(true) {
for(uint32_t old_entry_idx = 0; old_entry_idx < HashBucket::kNumEntries; ++old_entry_idx) {
HashBucketEntry old_entry = old_bucket->entries[old_entry_idx].load();
if(old_entry.unused()) {
// Nothing to do.
continue;
} else if(old_entry.address() < head_address) {
// Can't tell which new bucket the entry should go into; put it in both.
AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
continue;
}
const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(old_entry.address()));
KeyHash hash = kpt->get_hash();
if(hash.idx(new_size) < old_size) {
// Record's key hashes to the 0 side of the new hash table.
AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
Address other_address = TraceBackForOtherChainStart(old_size, new_size,
kpt->prev_address, head_address, 0);
if(other_address >= begin_address) {
// We found a record that either is on disk or has a key that hashes to the 1 side of
// the new hash table.
AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version,
HashBucketEntry{ other_address, old_entry.tag(), false });
}
} else {
// Record's key hashes to the 1 side of the new hash table.
AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
Address other_address = TraceBackForOtherChainStart(old_size, new_size,
kpt->prev_address, head_address, 1);
if(other_address >= begin_address) {
// We found a record that either is on disk or has a key that hashes to the 0 side of
// the new hash table.
AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version,
HashBucketEntry{ other_address, old_entry.tag(), false });
}
}
}
// Go to next bucket in the chain.
HashBucketOverflowEntry overflow_entry = old_bucket->overflow_entry.load();
if(overflow_entry.unused()) {
// No more buckets in the chain.
break;
}
old_bucket = &overflow_buckets_allocator_[grow_.old_version].Get(overflow_entry.address());
}
}
// Done with this chunk.
if(--grow_.num_pending_chunks == 0) {
// Free the old hash table.
state_[grow_.old_version].Uninitialize();
overflow_buckets_allocator_[grow_.old_version].Uninitialize();
break;
}
}
// Thread has finished growing its part of the hash table.
thread_ctx().phase = Phase::REST;
// Thread ack that it has finished growing the hash table.
if(epoch_.FinishThreadPhase(Phase::GROW_IN_PROGRESS)) {
// Let other threads know that they can use the new hash table now.
GlobalMoveToNextState(SystemState{ Action::GrowIndex, Phase::GROW_IN_PROGRESS,
thread_ctx().version });
} else {
while(system_state_.load().phase == Phase::GROW_IN_PROGRESS) {
// Spin until all other threads have finished splitting their chunks.
std::this_thread::yield();
}
}
}