size_t dequeue_bulk()

in include/ylt/util/concurrentqueue.h [3314:3445]


    size_t dequeue_bulk(It& itemFirst, size_t max) {
      auto tail = this->tailIndex.load(std::memory_order_relaxed);
      auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
      auto desiredCount = static_cast<size_t>(
          tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
                  overcommit));
      if (details::circular_less_than<size_t>(0, desiredCount)) {
        desiredCount = desiredCount < max ? desiredCount : max;
        std::atomic_thread_fence(std::memory_order_acquire);

        auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
            desiredCount, std::memory_order_relaxed);

        tail = this->tailIndex.load(std::memory_order_acquire);
        auto actualCount =
            static_cast<size_t>(tail - (myDequeueCount - overcommit));
        if (details::circular_less_than<size_t>(0, actualCount)) {
          actualCount = desiredCount < actualCount ? desiredCount : actualCount;
          if (actualCount < desiredCount) {
            this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
                                              std::memory_order_release);
          }

          // Get the first index. Note that since there's guaranteed to be at
          // least actualCount elements, this will never exceed tail.
          auto firstIndex =
              this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);

          // Iterate the blocks and dequeue
          auto index = firstIndex;
          BlockIndexHeader* localBlockIndex;
          auto indexIndex =
              get_block_index_index_for_index(index, localBlockIndex);
          do {
            auto blockStartIndex = index;
            index_t endIndex =
                (index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
                static_cast<index_t>(QUEUE_BLOCK_SIZE);
            endIndex =
                details::circular_less_than<index_t>(
                    firstIndex + static_cast<index_t>(actualCount), endIndex)
                    ? firstIndex + static_cast<index_t>(actualCount)
                    : endIndex;

            auto entry = localBlockIndex->index[indexIndex];
            auto block = entry->value.load(std::memory_order_relaxed);
            if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&,
                                           details::deref_noexcept(itemFirst) =
                                               std::move((*(*block)[index])))) {
              while (index != endIndex) {
                auto& el = *((*block)[index]);
                *itemFirst++ = std::move(el);
                el.~T();
                ++index;
              }
            }
            else {
              MOODYCAMEL_TRY {
                while (index != endIndex) {
                  auto& el = *((*block)[index]);
                  *itemFirst = std::move(el);
                  ++itemFirst;
                  el.~T();
                  ++index;
                }
              }
              MOODYCAMEL_CATCH(...) {
                do {
                  entry = localBlockIndex->index[indexIndex];
                  block = entry->value.load(std::memory_order_relaxed);
                  while (index != endIndex) {
                    (*block)[index++]->~T();
                  }

                  if (block->ConcurrentQueue::Block::template set_many_empty<
                          implicit_context>(
                          blockStartIndex,
                          static_cast<size_t>(endIndex - blockStartIndex))) {
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
                    debug::DebugLock lock(mutex);
#endif
                    entry->value.store(nullptr, std::memory_order_relaxed);
                    this->parent->add_block_to_free_list(block);
                  }
                  indexIndex =
                      (indexIndex + 1) & (localBlockIndex->capacity - 1);

                  blockStartIndex = index;
                  endIndex =
                      (index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
                      static_cast<index_t>(QUEUE_BLOCK_SIZE);
                  endIndex =
                      details::circular_less_than<index_t>(
                          firstIndex + static_cast<index_t>(actualCount),
                          endIndex)
                          ? firstIndex + static_cast<index_t>(actualCount)
                          : endIndex;
                } while (index != firstIndex + actualCount);

                MOODYCAMEL_RETHROW;
              }
            }
            if (block->ConcurrentQueue::Block::template set_many_empty<
                    implicit_context>(
                    blockStartIndex,
                    static_cast<size_t>(endIndex - blockStartIndex))) {
              {
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
                debug::DebugLock lock(mutex);
#endif
                // Note that the set_many_empty above did a release, meaning
                // that anybody who acquires the block we're about to free can
                // use it safely since our writes (and reads!) will have
                // happened-before then.
                entry->value.store(nullptr, std::memory_order_relaxed);
              }
              this->parent->add_block_to_free_list(
                  block);  // releases the above store
            }
            indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
          } while (index != firstIndex + actualCount);

          return actualCount;
        }
        else {
          this->dequeueOvercommit.fetch_add(desiredCount,
                                            std::memory_order_release);
        }
      }

      return 0;
    }