in include/ylt/util/concurrentqueue.h [3314:3445]
size_t dequeue_bulk(It& itemFirst, size_t max) {
auto tail = this->tailIndex.load(std::memory_order_relaxed);
auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
auto desiredCount = static_cast<size_t>(
tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
overcommit));
if (details::circular_less_than<size_t>(0, desiredCount)) {
desiredCount = desiredCount < max ? desiredCount : max;
std::atomic_thread_fence(std::memory_order_acquire);
auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
desiredCount, std::memory_order_relaxed);
tail = this->tailIndex.load(std::memory_order_acquire);
auto actualCount =
static_cast<size_t>(tail - (myDequeueCount - overcommit));
if (details::circular_less_than<size_t>(0, actualCount)) {
actualCount = desiredCount < actualCount ? desiredCount : actualCount;
if (actualCount < desiredCount) {
this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
std::memory_order_release);
}
// Get the first index. Note that since there's guaranteed to be at
// least actualCount elements, this will never exceed tail.
auto firstIndex =
this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
// Iterate the blocks and dequeue
auto index = firstIndex;
BlockIndexHeader* localBlockIndex;
auto indexIndex =
get_block_index_index_for_index(index, localBlockIndex);
do {
auto blockStartIndex = index;
index_t endIndex =
(index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
endIndex =
details::circular_less_than<index_t>(
firstIndex + static_cast<index_t>(actualCount), endIndex)
? firstIndex + static_cast<index_t>(actualCount)
: endIndex;
auto entry = localBlockIndex->index[indexIndex];
auto block = entry->value.load(std::memory_order_relaxed);
if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T&&,
details::deref_noexcept(itemFirst) =
std::move((*(*block)[index])))) {
while (index != endIndex) {
auto& el = *((*block)[index]);
*itemFirst++ = std::move(el);
el.~T();
++index;
}
}
else {
MOODYCAMEL_TRY {
while (index != endIndex) {
auto& el = *((*block)[index]);
*itemFirst = std::move(el);
++itemFirst;
el.~T();
++index;
}
}
MOODYCAMEL_CATCH(...) {
do {
entry = localBlockIndex->index[indexIndex];
block = entry->value.load(std::memory_order_relaxed);
while (index != endIndex) {
(*block)[index++]->~T();
}
if (block->ConcurrentQueue::Block::template set_many_empty<
implicit_context>(
blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
debug::DebugLock lock(mutex);
#endif
entry->value.store(nullptr, std::memory_order_relaxed);
this->parent->add_block_to_free_list(block);
}
indexIndex =
(indexIndex + 1) & (localBlockIndex->capacity - 1);
blockStartIndex = index;
endIndex =
(index & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
endIndex =
details::circular_less_than<index_t>(
firstIndex + static_cast<index_t>(actualCount),
endIndex)
? firstIndex + static_cast<index_t>(actualCount)
: endIndex;
} while (index != firstIndex + actualCount);
MOODYCAMEL_RETHROW;
}
}
if (block->ConcurrentQueue::Block::template set_many_empty<
implicit_context>(
blockStartIndex,
static_cast<size_t>(endIndex - blockStartIndex))) {
{
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
debug::DebugLock lock(mutex);
#endif
// Note that the set_many_empty above did a release, meaning
// that anybody who acquires the block we're about to free can
// use it safely since our writes (and reads!) will have
// happened-before then.
entry->value.store(nullptr, std::memory_order_relaxed);
}
this->parent->add_block_to_free_list(
block); // releases the above store
}
indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
} while (index != firstIndex + actualCount);
return actualCount;
}
else {
this->dequeueOvercommit.fetch_add(desiredCount,
std::memory_order_release);
}
}
return 0;
}