in include/ylt/util/concurrentqueue.h [2675:2801]
size_t dequeue_bulk(It& itemFirst, size_t max) {
auto tail = this->tailIndex.load(std::memory_order_relaxed);
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
auto desiredCount = static_cast<size_t>(
tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
overcommit));
if (details::circular_less_than<size_t>(0, desiredCount)) {
desiredCount = desiredCount < max ? desiredCount : max;
std::atomic_thread_fence(std::memory_order_acquire);
auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
desiredCount, std::memory_order_relaxed);
tail = this->tailIndex.load(std::memory_order_acquire);
auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
if (details::circular_less_than<size_t>(0, actualCount)) {
actualCount = desiredCount < actualCount ? desiredCount : actualCount;
if (actualCount < desiredCount) {
this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
std::memory_order_release);
}
// Get the first index. Note that since there's guaranteed to be at
// least actualCount elements, this will never exceed tail.
auto firstIndex =
this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
// Determine which block the first element is in
auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
auto localBlockIndexHead =
localBlockIndex->front.load(std::memory_order_acquire);
auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
auto firstBlockBaseIndex =
firstIndex & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1);
auto offset = static_cast<size_t>(
static_cast<typename std::make_signed<index_t>::type>(
firstBlockBaseIndex - headBase) /
static_cast<typename std::make_signed<index_t>::type>(
QUEUE_BLOCK_SIZE));
auto indexIndex =
(localBlockIndexHead + offset) & (localBlockIndex->size - 1);
// Iterate the blocks and dequeue
auto index = firstIndex;
do {
auto firstIndexInBlock = index;
index_t endIndex =
(index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
endIndex =
details::circular_less_than<index_t>(
firstIndex + static_cast<index_t>(actualCount), endIndex)
? firstIndex + static_cast<index_t>(actualCount)
: endIndex;
auto block = localBlockIndex->entries[indexIndex].block;
if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&,
details::deref_noexcept(itemFirst) =
std::move((*(*block)[index])))) {
while (index != endIndex) {
auto& el = *((*block)[index]);
*itemFirst++ = std::move(el);
el.~T();
++index;
}
}
else {
MOODYCAMEL_TRY {
while (index != endIndex) {
auto& el = *((*block)[index]);
*itemFirst = std::move(el);
++itemFirst;
el.~T();
++index;
}
}
MOODYCAMEL_CATCH(...) {
// It's too late to revert the dequeue, but we can make sure
// that all the dequeued objects are properly destroyed and the
// block index (and empty count) are properly updated before we
// propagate the exception
do {
block = localBlockIndex->entries[indexIndex].block;
while (index != endIndex) {
(*block)[index++]->~T();
}
block->ConcurrentQueue::Block::template set_many_empty<
explicit_context>(
firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
firstIndexInBlock = index;
endIndex =
(index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
endIndex =
details::circular_less_than<index_t>(
firstIndex + static_cast<index_t>(actualCount),
endIndex)
? firstIndex + static_cast<index_t>(actualCount)
: endIndex;
} while (index != firstIndex + actualCount);
MOODYCAMEL_RETHROW;
}
}
block->ConcurrentQueue::Block::template set_many_empty<
explicit_context>(
firstIndexInBlock,
static_cast<size_t>(endIndex - firstIndexInBlock));
indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
} while (index != firstIndex + actualCount);
return actualCount;
}
else {
// Wasn't anything to dequeue after all; make the effective dequeue
// count eventually consistent
this->dequeueOvercommit.fetch_add(desiredCount,
std::memory_order_release);
}
}
return 0;
}