size_t dequeue_bulk()

in include/ylt/util/concurrentqueue.h [2675:2801]


    size_t dequeue_bulk(It& itemFirst, size_t max) {
      auto tail = this->tailIndex.load(std::memory_order_relaxed);
      auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
      auto desiredCount = static_cast<size_t>(
          tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
                  overcommit));
      if (details::circular_less_than<size_t>(0, desiredCount)) {
        desiredCount = desiredCount < max ? desiredCount : max;
        std::atomic_thread_fence(std::memory_order_acquire);

        auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
            desiredCount, std::memory_order_relaxed);

        tail = this->tailIndex.load(std::memory_order_acquire);
        auto actualCount =
            static_cast<size_t>(tail - (myDequeueCount - overcommit));
        if (details::circular_less_than<size_t>(0, actualCount)) {
          actualCount = desiredCount < actualCount ? desiredCount : actualCount;
          if (actualCount < desiredCount) {
            this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
                                              std::memory_order_release);
          }

          // Get the first index. Note that since there's guaranteed to be at
          // least actualCount elements, this will never exceed tail.
          auto firstIndex =
              this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);

          // Determine which block the first element is in
          auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
          auto localBlockIndexHead =
              localBlockIndex->front.load(std::memory_order_acquire);

          auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
          auto firstBlockBaseIndex =
              firstIndex & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1);
          auto offset = static_cast<size_t>(
              static_cast<typename std::make_signed<index_t>::type>(
                  firstBlockBaseIndex - headBase) /
              static_cast<typename std::make_signed<index_t>::type>(
                  QUEUE_BLOCK_SIZE));
          auto indexIndex =
              (localBlockIndexHead + offset) & (localBlockIndex->size - 1);

          // Iterate the blocks and dequeue
          auto index = firstIndex;
          do {
            auto firstIndexInBlock = index;
            index_t endIndex =
                (index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
                static_cast<index_t>(QUEUE_BLOCK_SIZE);
            endIndex =
                details::circular_less_than<index_t>(
                    firstIndex + static_cast<index_t>(actualCount), endIndex)
                    ? firstIndex + static_cast<index_t>(actualCount)
                    : endIndex;
            auto block = localBlockIndex->entries[indexIndex].block;
            if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&,
                                           details::deref_noexcept(itemFirst) =
                                               std::move((*(*block)[index])))) {
              while (index != endIndex) {
                auto& el = *((*block)[index]);
                *itemFirst++ = std::move(el);
                el.~T();
                ++index;
              }
            }
            else {
              MOODYCAMEL_TRY {
                while (index != endIndex) {
                  auto& el = *((*block)[index]);
                  *itemFirst = std::move(el);
                  ++itemFirst;
                  el.~T();
                  ++index;
                }
              }
              MOODYCAMEL_CATCH(...) {
                // It's too late to revert the dequeue, but we can make sure
                // that all the dequeued objects are properly destroyed and the
                // block index (and empty count) are properly updated before we
                // propagate the exception
                do {
                  block = localBlockIndex->entries[indexIndex].block;
                  while (index != endIndex) {
                    (*block)[index++]->~T();
                  }
                  block->ConcurrentQueue::Block::template set_many_empty<
                      explicit_context>(
                      firstIndexInBlock,
                      static_cast<size_t>(endIndex - firstIndexInBlock));
                  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);

                  firstIndexInBlock = index;
                  endIndex =
                      (index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
                      static_cast<index_t>(QUEUE_BLOCK_SIZE);
                  endIndex =
                      details::circular_less_than<index_t>(
                          firstIndex + static_cast<index_t>(actualCount),
                          endIndex)
                          ? firstIndex + static_cast<index_t>(actualCount)
                          : endIndex;
                } while (index != firstIndex + actualCount);

                MOODYCAMEL_RETHROW;
              }
            }
            block->ConcurrentQueue::Block::template set_many_empty<
                explicit_context>(
                firstIndexInBlock,
                static_cast<size_t>(endIndex - firstIndexInBlock));
            indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
          } while (index != firstIndex + actualCount);

          return actualCount;
        }
        else {
          // Wasn't anything to dequeue after all; make the effective dequeue
          // count eventually consistent
          this->dequeueOvercommit.fetch_add(desiredCount,
                                            std::memory_order_release);
        }
      }

      return 0;
    }