bool enqueue_bulk()

in core/common/queue/concurrentqueue.h [2626:2774]


    bool enqueue_bulk(It itemFirst, size_t count)
    {
      // First, we need to make sure we have enough room to enqueue all of the elements;
      // this means pre-allocating blocks and putting them in the block index (but only if
      // all the allocations succeeded).

      // Note that the tailBlock we start off with may not be owned by us any more;
      // this happens if it was filled up exactly to the top (setting tailIndex to
      // the first index of the next block which is not yet allocated), then dequeued
      // completely (putting it on the free list) before we enqueue again.

      index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
      auto startBlock = this->tailBlock;
      Block* firstAllocatedBlock = nullptr;
      auto endBlock = this->tailBlock;

      // Figure out how many blocks we'll need to allocate, and do so
      size_t blockBaseDiff = ((startTailIndex + count - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1)) - ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
      index_t currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
      if (blockBaseDiff > 0) {
#ifdef MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
        debug::DebugLock lock(mutex);
#endif
        do {
          blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
          currentTailIndex += static_cast<index_t>(BLOCK_SIZE);

          // Find out where we'll be inserting this block in the block index
          BlockIndexEntry* idxEntry = nullptr;  // initialization here unnecessary but compiler can't always tell
          Block* newBlock;
          bool indexInserted = false;
          auto head = this->headIndex.load(std::memory_order_relaxed);
          assert(!details::circular_less_than<index_t>(currentTailIndex, head));
          bool full = !details::circular_less_than<index_t>(head, currentTailIndex + BLOCK_SIZE) || (MAX_SUBQUEUE_SIZE != details::const_numeric_max<size_t>::value && (MAX_SUBQUEUE_SIZE == 0 || MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));

          if (full || !(indexInserted = insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) || (newBlock = this->parent->ConcurrentQueue::template requisition_block<allocMode>()) == nullptr) {
            // Index allocation or block allocation failed; revert any other allocations
            // and index insertions done so far for this operation
            if (indexInserted) {
              rewind_block_index_tail();
              idxEntry->value.store(nullptr, std::memory_order_relaxed);
            }
            currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
            for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
              currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
              idxEntry = get_block_index_entry_for_index(currentTailIndex);
              idxEntry->value.store(nullptr, std::memory_order_relaxed);
              rewind_block_index_tail();
            }
            this->parent->add_blocks_to_free_list(firstAllocatedBlock);
            this->tailBlock = startBlock;

            return false;
          }

#ifdef MCDBGQ_TRACKMEM
          newBlock->owner = this;
#endif
          newBlock->ConcurrentQueue::Block::template reset_empty<implicit_context>();
          newBlock->next = nullptr;

          // Insert the new block into the index
          idxEntry->value.store(newBlock, std::memory_order_relaxed);

          // Store the chain of blocks so that we can undo if later allocations fail,
          // and so that we can find the blocks when we do the actual enqueueing
          if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr) {
            assert(this->tailBlock != nullptr);
            this->tailBlock->next = newBlock;
          }
          this->tailBlock = newBlock;
          endBlock = newBlock;
          firstAllocatedBlock = firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
        } while (blockBaseDiff > 0);
      }

      // Enqueue, one block at a time
      index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
      currentTailIndex = startTailIndex;
      this->tailBlock = startBlock;
      assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 || firstAllocatedBlock != nullptr || count == 0);
      if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 && firstAllocatedBlock != nullptr) {
        this->tailBlock = firstAllocatedBlock;
      }
      while (true) {
        index_t stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
        if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
          stopIndex = newTailIndex;
        }
        MOODYCAMEL_CONSTEXPR_IF (MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))) {
          while (currentTailIndex != stopIndex) {
            new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
          }
        }
        else {
          MOODYCAMEL_TRY {
            while (currentTailIndex != stopIndex) {
              new ((*this->tailBlock)[currentTailIndex]) T(details::nomove_if<!MOODYCAMEL_NOEXCEPT_CTOR(T, decltype(*itemFirst), new (static_cast<T*>(nullptr)) T(details::deref_noexcept(itemFirst)))>::eval(*itemFirst));
              ++currentTailIndex;
              ++itemFirst;
            }
          }
          MOODYCAMEL_CATCH (...) {
            auto constructedStopIndex = currentTailIndex;
            auto lastBlockEnqueued = this->tailBlock;

            if (!details::is_trivially_destructible<T>::value) {
              auto block = startBlock;
              if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
                block = firstAllocatedBlock;
              }
              currentTailIndex = startTailIndex;
              while (true) {
                stopIndex = (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) + static_cast<index_t>(BLOCK_SIZE);
                if (details::circular_less_than<index_t>(constructedStopIndex, stopIndex)) {
                  stopIndex = constructedStopIndex;
                }
                while (currentTailIndex != stopIndex) {
                  (*block)[currentTailIndex++]->~T();
                }
                if (block == lastBlockEnqueued) {
                  break;
                }
                block = block->next;
              }
            }

            currentTailIndex = (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
            for (auto block = firstAllocatedBlock; block != nullptr; block = block->next) {
              currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
              auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
              idxEntry->value.store(nullptr, std::memory_order_relaxed);
              rewind_block_index_tail();
            }
            this->parent->add_blocks_to_free_list(firstAllocatedBlock);
            this->tailBlock = startBlock;
            MOODYCAMEL_RETHROW;
          }
        }

        if (this->tailBlock == endBlock) {
          assert(currentTailIndex == newTailIndex);
          break;
        }
        this->tailBlock = this->tailBlock->next;
      }
      this->tailIndex.store(newTailIndex, std::memory_order_release);
      return true;
    }