class alignas()

in folly/concurrency/detail/ConcurrentHashMap-detail.h [242:777]


class alignas(64) BucketTable {
 public:
  // Slightly higher than 1.0, in case hashing to shards isn't
  // perfectly balanced, reserve(size) will still work without
  // rehashing.
  static constexpr float kDefaultLoadFactor = 1.05;
  typedef std::pair<const KeyType, ValueType> value_type;

  using Node =
      concurrenthashmap::bucket::NodeT<KeyType, ValueType, Allocator, Atom>;
  using InsertType = concurrenthashmap::InsertType;
  class Iterator;

  BucketTable(
      size_t initial_buckets,
      float load_factor,
      size_t max_size,
      hazptr_obj_cohort<Atom>* cohort)
      : load_factor_(load_factor), max_size_(max_size) {
    DCHECK(cohort);
    initial_buckets = folly::nextPowTwo(initial_buckets);
    DCHECK(
        max_size_ == 0 ||
        (isPowTwo(max_size_) &&
         (folly::popcount(max_size_ - 1) + ShardBits <= 32)));
    auto buckets = Buckets::create(initial_buckets, cohort);
    buckets_.store(buckets, std::memory_order_release);
    load_factor_nodes_ = to_integral(initial_buckets * load_factor_);
    bucket_count_.store(initial_buckets, std::memory_order_relaxed);
  }

  ~BucketTable() {
    auto buckets = buckets_.load(std::memory_order_relaxed);
    // To catch use-after-destruction bugs in user code.
    buckets_.store(nullptr, std::memory_order_release);
    // We can delete and not retire() here, since users must have
    // their own synchronization around destruction.
    auto count = bucket_count_.load(std::memory_order_relaxed);
    buckets->unlink_and_reclaim_nodes(count);
    buckets->destroy(count);
  }

  size_t size() { return size_.load(std::memory_order_acquire); }

  void clearSize() { size_.store(0, std::memory_order_release); }

  void incSize() {
    auto sz = size_.load(std::memory_order_relaxed);
    size_.store(sz + 1, std::memory_order_release);
  }

  void decSize() {
    auto sz = size_.load(std::memory_order_relaxed);
    DCHECK_GT(sz, 0);
    size_.store(sz - 1, std::memory_order_release);
  }

  bool empty() { return size() == 0; }

  template <typename MatchFunc, typename K, typename... Args>
  bool insert(
      Iterator& it,
      const K& k,
      InsertType type,
      MatchFunc match,
      hazptr_obj_cohort<Atom>* cohort,
      Args&&... args) {
    return doInsert(
        it, k, type, match, nullptr, cohort, std::forward<Args>(args)...);
  }

  template <typename MatchFunc, typename K, typename... Args>
  bool insert(
      Iterator& it,
      const K& k,
      InsertType type,
      MatchFunc match,
      Node* cur,
      hazptr_obj_cohort<Atom>* cohort) {
    return doInsert(it, k, type, match, cur, cohort, cur);
  }

  // Must hold lock.
  void rehash(size_t bucket_count, hazptr_obj_cohort<Atom>* cohort) {
    auto oldcount = bucket_count_.load(std::memory_order_relaxed);
    // bucket_count must be a power of 2
    DCHECK_EQ(bucket_count & (bucket_count - 1), 0);
    if (bucket_count <= oldcount) {
      return; // Rehash only if expanding.
    }
    auto buckets = buckets_.load(std::memory_order_relaxed);
    DCHECK(buckets); // Use-after-destruction by user.
    auto newbuckets = Buckets::create(bucket_count, cohort);
    load_factor_nodes_ = to_integral(bucket_count * load_factor_);
    for (size_t i = 0; i < oldcount; i++) {
      auto bucket = &buckets->buckets_[i]();
      auto node = bucket->load(std::memory_order_relaxed);
      if (!node) {
        continue;
      }
      auto h = HashFn()(node->getItem().first);
      auto idx = getIdx(bucket_count, h);
      // Reuse as long a chain as possible from the end.  Since the
      // nodes don't have previous pointers, the longest last chain
      // will be the same for both the previous hashmap and the new one,
      // assuming all the nodes hash to the same bucket.
      auto lastrun = node;
      auto lastidx = idx;
      auto count = 0;
      auto last = node->next_.load(std::memory_order_relaxed);
      for (; last != nullptr;
           last = last->next_.load(std::memory_order_relaxed)) {
        auto k = getIdx(bucket_count, HashFn()(last->getItem().first));
        if (k != lastidx) {
          lastidx = k;
          lastrun = last;
          count = 0;
        }
        count++;
      }
      // Set longest last run in new bucket, incrementing the refcount.
      lastrun->acquire_link(); // defined in hazptr_obj_base_linked
      newbuckets->buckets_[lastidx]().store(lastrun, std::memory_order_relaxed);
      // Clone remaining nodes
      for (; node != lastrun;
           node = node->next_.load(std::memory_order_relaxed)) {
        auto newnode = (Node*)Allocator().allocate(sizeof(Node));
        new (newnode) Node(cohort, node);
        auto k = getIdx(bucket_count, HashFn()(node->getItem().first));
        auto prevhead = &newbuckets->buckets_[k]();
        newnode->next_.store(prevhead->load(std::memory_order_relaxed));
        prevhead->store(newnode, std::memory_order_relaxed);
      }
    }

    auto oldbuckets = buckets_.load(std::memory_order_relaxed);
    DCHECK(oldbuckets); // Use-after-destruction by user.
    seqlock_.fetch_add(1, std::memory_order_release);
    bucket_count_.store(bucket_count, std::memory_order_release);
    buckets_.store(newbuckets, std::memory_order_release);
    seqlock_.fetch_add(1, std::memory_order_release);
    oldbuckets->retire(concurrenthashmap::HazptrTableDeleter(oldcount));
  }

  template <typename K>
  bool find(Iterator& res, const K& k) {
    auto& hazcurr = res.hazptrs_[1];
    auto& haznext = res.hazptrs_[2];
    auto h = HashFn()(k);
    size_t bcount;
    Buckets* buckets;
    getBucketsAndCount(bcount, buckets, res.hazptrs_[0]);

    auto idx = getIdx(bcount, h);
    auto prev = &buckets->buckets_[idx]();
    auto node = hazcurr.protect(*prev);
    while (node) {
      if (KeyEqual()(k, node->getItem().first)) {
        res.setNode(node, buckets, bcount, idx);
        return true;
      }
      node = haznext.protect(node->next_);
      hazcurr.swap(haznext);
    }
    return false;
  }

  template <typename K, typename MatchFunc>
  std::size_t erase(const K& key, Iterator* iter, MatchFunc match) {
    Node* node{nullptr};
    auto h = HashFn()(key);
    {
      std::lock_guard<Mutex> g(m_);

      size_t bcount = bucket_count_.load(std::memory_order_relaxed);
      auto buckets = buckets_.load(std::memory_order_relaxed);
      DCHECK(buckets); // Use-after-destruction by user.
      auto idx = getIdx(bcount, h);
      auto head = &buckets->buckets_[idx]();
      node = head->load(std::memory_order_relaxed);
      Node* prev = nullptr;
      while (node) {
        if (KeyEqual()(key, node->getItem().first)) {
          if (!match(node->getItem().second)) {
            return 0;
          }
          auto next = node->next_.load(std::memory_order_relaxed);
          if (next) {
            next->acquire_link(); // defined in hazptr_obj_base_linked
          }
          if (prev) {
            prev->next_.store(next, std::memory_order_release);
          } else {
            // Must be head of list.
            head->store(next, std::memory_order_release);
          }

          if (iter) {
            iter->hazptrs_[0].reset_protection(buckets);
            iter->setNode(
                node->next_.load(std::memory_order_acquire),
                buckets,
                bcount,
                idx);
            iter->next();
          }
          decSize();
          break;
        }
        prev = node;
        node = node->next_.load(std::memory_order_relaxed);
      }
    }
    // Delete the node while not under the lock.
    if (node) {
      node->release();
      return 1;
    }

    return 0;
  }

  void clear(hazptr_obj_cohort<Atom>* cohort) {
    size_t bcount;
    Buckets* buckets;
    {
      std::lock_guard<Mutex> g(m_);
      bcount = bucket_count_.load(std::memory_order_relaxed);
      auto newbuckets = Buckets::create(bcount, cohort);
      buckets = buckets_.load(std::memory_order_relaxed);
      buckets_.store(newbuckets, std::memory_order_release);
      clearSize();
    }
    DCHECK(buckets); // Use-after-destruction by user.
    buckets->retire(concurrenthashmap::HazptrTableDeleter(bcount));
  }

  void max_load_factor(float factor) {
    std::lock_guard<Mutex> g(m_);
    load_factor_ = factor;
    load_factor_nodes_ =
        bucket_count_.load(std::memory_order_relaxed) * load_factor_;
  }

  Iterator cbegin() {
    Iterator res;
    size_t bcount;
    Buckets* buckets;
    getBucketsAndCount(bcount, buckets, res.hazptrs_[0]);
    res.setNode(nullptr, buckets, bcount, 0);
    res.next();
    return res;
  }

  Iterator cend() { return Iterator(nullptr); }

 private:
  // Could be optimized to avoid an extra pointer dereference by
  // allocating buckets_ at the same time.
  class Buckets : public hazptr_obj_base<
                      Buckets,
                      Atom,
                      concurrenthashmap::HazptrTableDeleter> {
    using BucketRoot = hazptr_root<Node, Atom>;

    Buckets() {}
    ~Buckets() {}

   public:
    static Buckets* create(size_t count, hazptr_obj_cohort<Atom>* cohort) {
      auto buf =
          Allocator().allocate(sizeof(Buckets) + sizeof(BucketRoot) * count);
      auto buckets = new (buf) Buckets();
      DCHECK(cohort);
      buckets->set_cohort_tag(cohort); // defined in hazptr_obj
      for (size_t i = 0; i < count; i++) {
        new (&buckets->buckets_[i]) BucketRoot;
      }
      return buckets;
    }

    void destroy(size_t count) {
      for (size_t i = 0; i < count; i++) {
        buckets_[i].~BucketRoot();
      }
      this->~Buckets();
      Allocator().deallocate(
          (uint8_t*)this, sizeof(BucketRoot) * count + sizeof(*this));
    }

    void unlink_and_reclaim_nodes(size_t count) {
      for (size_t i = 0; i < count; i++) {
        auto node = buckets_[i]().load(std::memory_order_relaxed);
        if (node) {
          buckets_[i]().store(nullptr, std::memory_order_relaxed);
          while (node) {
            auto next = node->next_.load(std::memory_order_relaxed);
            if (next) {
              node->next_.store(nullptr, std::memory_order_relaxed);
            }
            node->unlink_and_reclaim_unchecked();
            node = next;
          }
        }
      }
    }

    BucketRoot buckets_[0];
  };

 public:
  class Iterator {
   public:
    FOLLY_ALWAYS_INLINE Iterator()
        : hazptrs_(make_hazard_pointer_array<3, Atom>()) {}
    FOLLY_ALWAYS_INLINE explicit Iterator(std::nullptr_t) : hazptrs_() {}
    FOLLY_ALWAYS_INLINE ~Iterator() {}

    void setNode(
        Node* node, Buckets* buckets, size_t bucket_count, uint64_t idx) {
      node_ = node;
      buckets_ = buckets;
      idx_ = idx;
      bucket_count_ = bucket_count;
    }

    const value_type& operator*() const {
      DCHECK(node_);
      return node_->getItem();
    }

    const value_type* operator->() const {
      DCHECK(node_);
      return &(node_->getItem());
    }

    const Iterator& operator++() {
      DCHECK(node_);
      node_ = hazptrs_[2].protect(node_->next_);
      hazptrs_[1].swap(hazptrs_[2]);
      if (!node_) {
        ++idx_;
        next();
      }
      return *this;
    }

    void next() {
      while (!node_) {
        if (idx_ >= bucket_count_) {
          break;
        }
        DCHECK(buckets_);
        node_ = hazptrs_[1].protect(buckets_->buckets_[idx_]());
        if (node_) {
          break;
        }
        ++idx_;
      }
    }

    bool operator==(const Iterator& o) const { return node_ == o.node_; }

    bool operator!=(const Iterator& o) const { return !(*this == o); }

    Iterator& operator=(const Iterator& o) = delete;

    Iterator& operator=(Iterator&& o) noexcept {
      if (this != &o) {
        hazptrs_ = std::move(o.hazptrs_);
        node_ = std::exchange(o.node_, nullptr);
        buckets_ = std::exchange(o.buckets_, nullptr);
        bucket_count_ = std::exchange(o.bucket_count_, 0);
        idx_ = std::exchange(o.idx_, 0);
      }
      return *this;
    }

    Iterator(const Iterator& o) = delete;

    Iterator(Iterator&& o) noexcept
        : hazptrs_(std::move(o.hazptrs_)),
          node_(std::exchange(o.node_, nullptr)),
          buckets_(std::exchange(o.buckets_, nullptr)),
          bucket_count_(std::exchange(o.bucket_count_, 0)),
          idx_(std::exchange(o.idx_, 0)) {}

    // These are accessed directly from the functions above
    hazptr_array<3, Atom> hazptrs_;

   private:
    Node* node_{nullptr};
    Buckets* buckets_{nullptr};
    size_t bucket_count_{0};
    uint64_t idx_{0};
  };

 private:
  // Shards have already used low ShardBits of the hash.
  // Shift it over to use fresh bits.
  uint64_t getIdx(size_t bucket_count, size_t hash) {
    return (hash >> ShardBits) & (bucket_count - 1);
  }
  void getBucketsAndCount(
      size_t& bcount, Buckets*& buckets, hazptr_holder<Atom>& hazptr) {
    while (true) {
      auto seqlock = seqlock_.load(std::memory_order_acquire);
      bcount = bucket_count_.load(std::memory_order_acquire);
      buckets = hazptr.protect(buckets_);
      auto seqlock2 = seqlock_.load(std::memory_order_acquire);
      if (!(seqlock & 1) && (seqlock == seqlock2)) {
        break;
      }
    }
    DCHECK(buckets) << "Use-after-destruction by user.";
  }

  template <typename MatchFunc, typename K, typename... Args>
  bool doInsert(
      Iterator& it,
      const K& k,
      InsertType type,
      MatchFunc match,
      Node* cur,
      hazptr_obj_cohort<Atom>* cohort,
      Args&&... args) {
    auto h = HashFn()(k);
    std::unique_lock<Mutex> g(m_);

    size_t bcount = bucket_count_.load(std::memory_order_relaxed);
    auto buckets = buckets_.load(std::memory_order_relaxed);
    // Check for rehash needed for DOES_NOT_EXIST
    if (size() >= load_factor_nodes_ && type == InsertType::DOES_NOT_EXIST) {
      if (max_size_ && size() << 1 > max_size_) {
        // Would exceed max size.
        throw_exception<std::bad_alloc>();
      }
      rehash(bcount << 1, cohort);
      buckets = buckets_.load(std::memory_order_relaxed);
      bcount = bucket_count_.load(std::memory_order_relaxed);
    }

    DCHECK(buckets) << "Use-after-destruction by user.";
    auto idx = getIdx(bcount, h);
    auto head = &buckets->buckets_[idx]();
    auto node = head->load(std::memory_order_relaxed);
    auto headnode = node;
    auto prev = head;
    auto& hazbuckets = it.hazptrs_[0];
    auto& haznode = it.hazptrs_[1];
    hazbuckets.reset_protection(buckets);
    while (node) {
      // Is the key found?
      if (KeyEqual()(k, node->getItem().first)) {
        it.setNode(node, buckets, bcount, idx);
        haznode.reset_protection(node);
        if (type == InsertType::MATCH) {
          if (!match(node->getItem().second)) {
            return false;
          }
        }
        if (type == InsertType::DOES_NOT_EXIST) {
          return false;
        } else {
          if (!cur) {
            cur = (Node*)Allocator().allocate(sizeof(Node));
            new (cur) Node(cohort, std::forward<Args>(args)...);
          }
          auto next = node->next_.load(std::memory_order_relaxed);
          cur->next_.store(next, std::memory_order_relaxed);
          if (next) {
            next->acquire_link(); // defined in hazptr_obj_base_linked
          }
          prev->store(cur, std::memory_order_release);
          it.setNode(cur, buckets, bcount, idx);
          haznode.reset_protection(cur);
          g.unlock();
          // Release not under lock.
          node->release();
          return true;
        }
      }

      prev = &node->next_;
      node = node->next_.load(std::memory_order_relaxed);
    }
    if (type != InsertType::DOES_NOT_EXIST && type != InsertType::ANY) {
      haznode.reset_protection();
      hazbuckets.reset_protection();
      return false;
    }
    // Node not found, check for rehash on ANY
    if (size() >= load_factor_nodes_ && type == InsertType::ANY) {
      if (max_size_ && size() << 1 > max_size_) {
        // Would exceed max size.
        throw_exception<std::bad_alloc>();
      }
      rehash(bcount << 1, cohort);

      // Reload correct bucket.
      buckets = buckets_.load(std::memory_order_relaxed);
      DCHECK(buckets); // Use-after-destruction by user.
      bcount <<= 1;
      hazbuckets.reset_protection(buckets);
      idx = getIdx(bcount, h);
      head = &buckets->buckets_[idx]();
      headnode = head->load(std::memory_order_relaxed);
    }

    // We found a slot to put the node.
    incSize();
    if (!cur) {
      // InsertType::ANY
      // OR DOES_NOT_EXIST, but only in the try_emplace case
      DCHECK(type == InsertType::ANY || type == InsertType::DOES_NOT_EXIST);
      cur = (Node*)Allocator().allocate(sizeof(Node));
      new (cur) Node(cohort, std::forward<Args>(args)...);
    }
    cur->next_.store(headnode, std::memory_order_relaxed);
    head->store(cur, std::memory_order_release);
    it.setNode(cur, buckets, bcount, idx);
    haznode.reset_protection(cur);
    return true;
  }

  Mutex m_;
  float load_factor_;
  size_t load_factor_nodes_;
  Atom<size_t> size_{0};
  size_t const max_size_;

  // Fields needed for read-only access, on separate cacheline.
  alignas(64) Atom<Buckets*> buckets_{nullptr};
  std::atomic<uint64_t> seqlock_{0};
  Atom<size_t> bucket_count_;
};