void FishStore::SplitHashTableBuckets()

in src/core/fishstore.h [2807:2904]


void FishStore<D, A>::SplitHashTableBuckets() {
  // This thread won't exit until all hash table buckets have been split.
  Address head_address = hlog.head_address.load();
  Address begin_address = hlog.begin_address.load();
  for(uint64_t chunk = grow_.next_chunk++; chunk < grow_.num_chunks; chunk = grow_.next_chunk++) {
    uint64_t old_size = state_[grow_.old_version].size();
    uint64_t new_size = state_[grow_.new_version].size();
    assert(new_size == old_size * 2);
    // Split this chunk.
    uint64_t upper_bound;
    if(chunk + 1 < grow_.num_chunks) {
      // All chunks but the last chunk contain kGrowHashTableChunkSize elements.
      upper_bound = kGrowHashTableChunkSize;
    } else {
      // Last chunk might contain more or fewer elements.
      upper_bound = old_size - (chunk * kGrowHashTableChunkSize);
    }
    for(uint64_t idx = 0; idx < upper_bound; ++idx) {

      // Split this (chain of) bucket(s).
      HashBucket* old_bucket = &state_[grow_.old_version].bucket(
                                 chunk * kGrowHashTableChunkSize + idx);
      HashBucket* new_bucket0 = &state_[grow_.new_version].bucket(
                                  chunk * kGrowHashTableChunkSize + idx);
      HashBucket* new_bucket1 = &state_[grow_.new_version].bucket(
                                  old_size + chunk * kGrowHashTableChunkSize + idx);
      uint32_t new_entry_idx0 = 0;
      uint32_t new_entry_idx1 = 0;
      while(true) {
        for(uint32_t old_entry_idx = 0; old_entry_idx < HashBucket::kNumEntries; ++old_entry_idx) {
          HashBucketEntry old_entry = old_bucket->entries[old_entry_idx].load();
          if(old_entry.unused()) {
            // Nothing to do.
            continue;
          } else if(old_entry.address() < head_address) {
            // Can't tell which new bucket the entry should go into; put it in both.
            AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
            AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
            continue;
          }

          const KeyPointer* kpt = reinterpret_cast<const KeyPointer*>(hlog.Get(old_entry.address()));
          KeyHash hash = kpt->get_hash();
          if(hash.idx(new_size) < old_size) {
            // Record's key hashes to the 0 side of the new hash table.
            AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version, old_entry);
            Address other_address = TraceBackForOtherChainStart(old_size, new_size,
                                    kpt->prev_address, head_address, 0);
            if(other_address >= begin_address) {
              // We found a record that either is on disk or has a key that hashes to the 1 side of
              // the new hash table.
              AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version,
                           HashBucketEntry{ other_address, old_entry.tag(), false });
            }
          } else {
            // Record's key hashes to the 1 side of the new hash table.
            AddHashEntry(new_bucket1, new_entry_idx1, grow_.new_version, old_entry);
            Address other_address = TraceBackForOtherChainStart(old_size, new_size,
                                    kpt->prev_address, head_address, 1);
            if(other_address >= begin_address) {
              // We found a record that either is on disk or has a key that hashes to the 0 side of
              // the new hash table.
              AddHashEntry(new_bucket0, new_entry_idx0, grow_.new_version,
                           HashBucketEntry{ other_address, old_entry.tag(), false });
            }
          }
        }
        // Go to next bucket in the chain.
        HashBucketOverflowEntry overflow_entry = old_bucket->overflow_entry.load();
        if(overflow_entry.unused()) {
          // No more buckets in the chain.
          break;
        }
        old_bucket = &overflow_buckets_allocator_[grow_.old_version].Get(overflow_entry.address());
      }
    }
    // Done with this chunk.
    if(--grow_.num_pending_chunks == 0) {
      // Free the old hash table.
      state_[grow_.old_version].Uninitialize();
      overflow_buckets_allocator_[grow_.old_version].Uninitialize();
      break;
    }
  }
  // Thread has finished growing its part of the hash table.
  thread_ctx().phase = Phase::REST;
  // Thread ack that it has finished growing the hash table.
  if(epoch_.FinishThreadPhase(Phase::GROW_IN_PROGRESS)) {
    // Let other threads know that they can use the new hash table now.
    GlobalMoveToNextState(SystemState{ Action::GrowIndex, Phase::GROW_IN_PROGRESS,
                                       thread_ctx().version });
  } else {
    while(system_state_.load().phase == Phase::GROW_IN_PROGRESS) {
      // Spin until all other threads have finished splitting their chunks.
      std::this_thread::yield();
    }
  }
}