in core/common/queue/concurrentqueue.h [2626:2774]
bool enqueue_bulk(It itemFirst, size_t count)
{
// First, we need to make sure we have enough room to enqueue all of the elements;
// this means pre-allocating blocks and putting them in the block index (but only if
// all the allocations succeeded).
// Note that the tailBlock we start off with may not be owned by us any more;
// this happens if it was filled up exactly to the top (setting tailIndex to
// the first index of the next block which is not yet allocated), then dequeued
// completely (putting it on the free list) before we enqueue again.
index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
auto startBlock = this->tailBlock;
Block* firstAllocatedBlock = nullptr;
auto endBlock = this->tailBlock;
// Figure out how many blocks we'll need to allocate, and do so
size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
if (blockBaseDiff > 0) {
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
debug::DebugLock lock(mutex);
#endif
do {
blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
// Find out where we'll be inserting this block in the block index
BlockIndexEntry* idxEntry = nullptr; // initialization here unnecessary but compiler can't always tell
Block* newBlock;
bool indexInserted = false;
auto head = this->headIndex.load(std::memory_order_relaxed);
assert(!details::circular_less_than<index_t>(currentTailIndex, head));
bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
// Index allocation or block allocation failed; revert any other allocations
// and index insertions done so far for this operation
if (indexInserted) {
rewind_block_index_tail();
idxEntry->value.store(nullptr, std::memory_order_relaxed);
}
currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
idxEntry = get_block_index_entry_for_index(currentTailIndex);
idxEntry->value.store(nullptr, std::memory_order_relaxed);
rewind_block_index_tail();
}
this->parent->add_blocks_to_free_list(firstAllocatedBlock);
this->tailBlock = startBlock;
return false;
}
#ifdef MCDBGQ_TRACKMEM
newBlock->owner = this;
#endif
newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
newBlock->next = nullptr;
// Insert the new block into the index
idxEntry->value.store(newBlock, std::memory_order_relaxed);
// Store the chain of blocks so that we can undo if later allocations fail,
// and so that we can find the blocks when we do the actual enqueueing
if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
assert(this->tailBlock != nullptr);
this->tailBlock->next = newBlock;
}
this->tailBlock = newBlock;
endBlock = newBlock;
firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
} while (blockBaseDiff > 0);
}
// Enqueue, one block at a time
index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
currentTailIndex = startTailIndex;
this->tailBlock = startBlock;
assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
this->tailBlock = firstAllocatedBlock;
}
while (true) {
index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
stopIndex = newTailIndex;
}
MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
while (currentTailIndex != stopIndex) {
new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
}
}
else {
MOODYCAMEL_TRY {
while (currentTailIndex != stopIndex) {
new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
++currentTailIndex;
++itemFirst;
}
}
MOODYCAMEL_CATCH (...) {
auto constructedStopIndex = currentTailIndex;
auto lastBlockEnqueued = this->tailBlock;
if (!details::is_trivially_destructible<T>::value) {
auto block = startBlock;
if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
block = firstAllocatedBlock;
}
currentTailIndex = startTailIndex;
while (true) {
stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
stopIndex = constructedStopIndex;
}
while (currentTailIndex != stopIndex) {
(*block)[currentTailIndex++]->~T();
}
if (block == lastBlockEnqueued) {
break;
}
block = block->next;
}
}
currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
idxEntry->value.store(nullptr, std::memory_order_relaxed);
rewind_block_index_tail();
}
this->parent->add_blocks_to_free_list(firstAllocatedBlock);
this->tailBlock = startBlock;
MOODYCAMEL_RETHROW;
}
}
if (this->tailBlock == endBlock) {
assert(currentTailIndex == newTailIndex);
break;
}
this->tailBlock = this->tailBlock->next;
}
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}