bool FishStore::CleanHashTableBuckets()

in src/core/fishstore.h [2732:2772]


bool FishStore<D, A>::CleanHashTableBuckets() {
  uint64_t chunk = gc_.next_chunk++;
  if(chunk >= gc_.num_chunks) {
    // No chunk left to clean.
    return false;
  }
  uint8_t version = resize_info_.version;
  Address begin_address = hlog.begin_address.load();
  uint64_t upper_bound;
  if(chunk + 1 < grow_.num_chunks) {
    // All chunks but the last chunk contain kGrowHashTableChunkSize elements.
    upper_bound = kGrowHashTableChunkSize;
  } else {
    // Last chunk might contain more or fewer elements.
    upper_bound = state_[version].size() - (chunk * kGcHashTableChunkSize);
  }
  for(uint64_t idx = 0; idx < upper_bound; ++idx) {
    HashBucket* bucket = &state_[version].bucket(chunk * kGcHashTableChunkSize + idx);
    while(true) {
      for(uint32_t entry_idx = 0; entry_idx < HashBucket::kNumEntries; ++entry_idx) {
        AtomicHashBucketEntry& atomic_entry = bucket->entries[entry_idx];
        HashBucketEntry expected_entry = atomic_entry.load();
        if(!expected_entry.unused() && expected_entry.address() != Address::kInvalidAddress &&
            expected_entry.address() < begin_address) {
          // The record that this entry points to was truncated; try to delete the entry.
          atomic_entry.compare_exchange_strong(expected_entry, HashBucketEntry::kInvalidEntry);
          // If deletion failed, then some other thread must have added a new record to the entry.
        }
      }
      // Go to next bucket in the chain.
      HashBucketOverflowEntry overflow_entry = bucket->overflow_entry.load();
      if(overflow_entry.unused()) {
        // No more buckets in the chain.
        break;
      }
      bucket = &overflow_buckets_allocator_[version].Get(overflow_entry.address());
    }
  }
  // Done with this chunk--did some work.
  return true;
}