in src/core/fishstore.h [2732:2772]
bool FishStore<D, A>::CleanHashTableBuckets() {
uint64_t chunk = gc_.next_chunk++;
if(chunk >= gc_.num_chunks) {
// No chunk left to clean.
return false;
}
uint8_t version = resize_info_.version;
Address begin_address = hlog.begin_address.load();
uint64_t upper_bound;
if(chunk + 1 < grow_.num_chunks) {
// All chunks but the last chunk contain kGrowHashTableChunkSize elements.
upper_bound = kGrowHashTableChunkSize;
} else {
// Last chunk might contain more or fewer elements.
upper_bound = state_[version].size() - (chunk * kGcHashTableChunkSize);
}
for(uint64_t idx = 0; idx < upper_bound; ++idx) {
HashBucket* bucket = &state_[version].bucket(chunk * kGcHashTableChunkSize + idx);
while(true) {
for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
AtomicHashBucketEntry& atomic_entry = bucket->entries[entry_idx];
HashBucketEntry expected_entry = atomic_entry.load();
if(!expected_entry.unused() && expected_entry.address() != Address::kInvalidAddress &&
expected_entry.address() < begin_address) {
// The record that this entry points to was truncated; try to delete the entry.
atomic_entry.compare_exchange_strong(expected_entry, HashBucketEntry::kInvalidEntry);
// If deletion failed, then some other thread must have added a new record to the entry.
}
}
// Go to next bucket in the chain.
HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load();
if(overflow_entry.unused()) {
// No more buckets in the chain.
break;
}
bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address());
}
}
// Done with this chunk--did some work.
return true;
}