in include/ylt/util/concurrentqueue.h [2428:2672]
bool MOODYCAMEL_NO_TSAN enqueue_bulk(It itemFirst, size_t count) {
// First, we need to make sure we have enough room to enqueue all of the
// elements; this means pre-allocating blocks and putting them in the
// block index (but only if all the allocations succeeded).
index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
auto startBlock = this->tailBlock;
auto originalBlockIndexFront = pr_blockIndexFront;
auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
Block* firstAllocatedBlock = nullptr;
// Figure out how many blocks we'll need to allocate, and do so
size_t blockBaseDiff =
((startTailIndex + count - 1) &
~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) -
((startTailIndex - 1) & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1));
index_t currentTailIndex =
(startTailIndex - 1) & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1);
if (blockBaseDiff > 0) {
// Allocate as many blocks as possible from ahead
while (blockBaseDiff > 0 && this->tailBlock != nullptr &&
this->tailBlock->next != firstAllocatedBlock &&
this->tailBlock->next->ConcurrentQueue::Block::template is_empty<
explicit_context>()) {
blockBaseDiff -= static_cast<index_t>(QUEUE_BLOCK_SIZE);
currentTailIndex += static_cast<index_t>(QUEUE_BLOCK_SIZE);
this->tailBlock = this->tailBlock->next;
firstAllocatedBlock = firstAllocatedBlock == nullptr
? this->tailBlock
: firstAllocatedBlock;
auto& entry = blockIndex.load(std::memory_order_relaxed)
->entries[pr_blockIndexFront];
entry.base = currentTailIndex;
entry.block = this->tailBlock;
pr_blockIndexFront =
(pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
}
// Now allocate as many blocks as necessary from the block pool
while (blockBaseDiff > 0) {
blockBaseDiff -= static_cast<index_t>(QUEUE_BLOCK_SIZE);
currentTailIndex += static_cast<index_t>(QUEUE_BLOCK_SIZE);
auto head = this->headIndex.load(std::memory_order_relaxed);
assert(!details::circular_less_than<index_t>(currentTailIndex, head));
bool full =
!details::circular_less_than<index_t>(
head, currentTailIndex + QUEUE_BLOCK_SIZE) ||
(MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value &&
(MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - QUEUE_BLOCK_SIZE <
currentTailIndex - head));
if (pr_blockIndexRaw == nullptr ||
pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
MOODYCAMEL_CONSTEXPR_IF(allocMode == CannotAlloc) {
// Failed to allocate, undo changes (but keep injected blocks)
pr_blockIndexFront = originalBlockIndexFront;
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
this->tailBlock =
startBlock == nullptr ? firstAllocatedBlock : startBlock;
return false;
}
else if (full || !new_block_index(originalBlockIndexSlotsUsed)) {
// Failed to allocate, undo changes (but keep injected blocks)
pr_blockIndexFront = originalBlockIndexFront;
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
this->tailBlock =
startBlock == nullptr ? firstAllocatedBlock : startBlock;
return false;
}
// pr_blockIndexFront is updated inside new_block_index, so we need
// to update our fallback value too (since we keep the new index
// even if we later fail)
originalBlockIndexFront = originalBlockIndexSlotsUsed;
}
// Insert a new block in the circular linked list
auto newBlock =
this->parent
->ConcurrentQueue::template requisition_block<allocMode>();
if (newBlock == nullptr) {
pr_blockIndexFront = originalBlockIndexFront;
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
this->tailBlock =
startBlock == nullptr ? firstAllocatedBlock : startBlock;
return false;
}
#ifdef MCDBGQ_TRACKMEM
newBlock->owner = this;
#endif
newBlock->ConcurrentQueue::Block::template set_all_empty<
explicit_context>();
if (this->tailBlock == nullptr) {
newBlock->next = newBlock;
}
else {
newBlock->next = this->tailBlock->next;
this->tailBlock->next = newBlock;
}
this->tailBlock = newBlock;
firstAllocatedBlock = firstAllocatedBlock == nullptr
? this->tailBlock
: firstAllocatedBlock;
++pr_blockIndexSlotsUsed;
auto& entry = blockIndex.load(std::memory_order_relaxed)
->entries[pr_blockIndexFront];
entry.base = currentTailIndex;
entry.block = this->tailBlock;
pr_blockIndexFront =
(pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
}
// Excellent, all allocations succeeded. Reset each block's emptiness
// before we fill them up, and publish the new block index front
auto block = firstAllocatedBlock;
while (true) {
block->ConcurrentQueue::Block::template reset_empty<
explicit_context>();
if (block == this->tailBlock) {
break;
}
block = block->next;
}
MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
T, decltype(*itemFirst),
new (static_cast<T*>(nullptr))
T(details::deref_noexcept(itemFirst)))) {
blockIndex.load(std::memory_order_relaxed)
->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
std::memory_order_release);
}
}
// Enqueue, one block at a time
index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
currentTailIndex = startTailIndex;
auto endBlock = this->tailBlock;
this->tailBlock = startBlock;
assert((startTailIndex & static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) !=
0 ||
firstAllocatedBlock != nullptr || count == 0);
if ((startTailIndex & static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) == 0 &&
firstAllocatedBlock != nullptr) {
this->tailBlock = firstAllocatedBlock;
}
while (true) {
index_t stopIndex =
(currentTailIndex & ~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
stopIndex = newTailIndex;
}
MOODYCAMEL_CONSTEXPR_IF(MOODYCAMEL_NOEXCEPT_CTOR(
T, decltype(*itemFirst),
new (static_cast<T*>(nullptr))
T(details::deref_noexcept(itemFirst)))) {
while (currentTailIndex != stopIndex) {
new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
}
}
else {
MOODYCAMEL_TRY {
while (currentTailIndex != stopIndex) {
// Must use copy constructor even if move constructor is available
// because we may have to revert if there's an exception.
// Sorry about the horrible templated next line, but it was the
// only way to disable moving *at compile time*, which is
// important because a type may only define a (noexcept) move
// constructor, and so calls to the cctor will not compile, even
// if they are in an if branch that will never be executed
new ((*this->tailBlock)[currentTailIndex]) T(
details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(
T, decltype(*itemFirst),
new (static_cast<T*>(nullptr)) T(details::deref_noexcept(
itemFirst)))>::eval(*itemFirst));
++currentTailIndex;
++itemFirst;
}
}
MOODYCAMEL_CATCH(...) {
// Oh dear, an exception's been thrown -- destroy the elements that
// were enqueued so far and revert the entire bulk operation (we'll
// keep any allocated blocks in our linked list for later, though).
auto constructedStopIndex = currentTailIndex;
auto lastBlockEnqueued = this->tailBlock;
pr_blockIndexFront = originalBlockIndexFront;
pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
this->tailBlock =
startBlock == nullptr ? firstAllocatedBlock : startBlock;
if (!details::is_trivially_destructible<T>::value) {
auto block = startBlock;
if ((startTailIndex &
static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) == 0) {
block = firstAllocatedBlock;
}
currentTailIndex = startTailIndex;
while (true) {
stopIndex = (currentTailIndex &
~static_cast<index_t>(QUEUE_BLOCK_SIZE - 1)) +
static_cast<index_t>(QUEUE_BLOCK_SIZE);
if (details::circular_less_than<index_t>(constructedStopIndex,
stopIndex)) {
stopIndex = constructedStopIndex;
}
while (currentTailIndex != stopIndex) {
(*block)[currentTailIndex++]->~T();
}
if (block == lastBlockEnqueued) {
break;
}
block = block->next;
}
}
MOODYCAMEL_RETHROW;
}
}
if (this->tailBlock == endBlock) {
assert(currentTailIndex == newTailIndex);
break;
}
this->tailBlock = this->tailBlock->next;
}
MOODYCAMEL_CONSTEXPR_IF(!MOODYCAMEL_NOEXCEPT_CTOR(
T, decltype(*itemFirst),
new (static_cast<T*>(nullptr))
T(details::deref_noexcept(itemFirst)))) {
if (firstAllocatedBlock != nullptr)
blockIndex.load(std::memory_order_relaxed)
->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
std::memory_order_release);
}
this->tailIndex.store(newTailIndex, std::memory_order_release);
return true;
}