class alignas()

in Source/Task/LocklessQueue.h [63:839]


class alignas(8) LocklessQueue
{
public:
    
    static void* operator new(_In_ size_t sz)
    {
        void* ptr = aligned_malloc(sz, 8);
        if (ptr == nullptr)
        {
            throw new std::bad_alloc;
        }
        return ptr;
    }
    
    static void* operator new(_In_ size_t sz, _In_ const std::nothrow_t&)
    {
        return aligned_malloc(sz, 8);
    }
    
    static void operator delete(_In_ void* ptr)
    {
        aligned_free(ptr);
    }

    //
    // Creates a new lockless queue.  The blockSize parameter indicates how many
    // node elements are contained in a block.  Additional blocks will be allocated
    // as needed but for best performance use a block size that does not require
    // additional allocation for your work load. Passing zero will use the 
    // minimum size.
    //
    LocklessQueue(_In_ uint32_t blockSize = 0x400) noexcept :
        m_localHeap(*this),
        m_heap(m_localHeap),
        m_activeList(*this),
        m_blockCache(nullptr)
    {
        m_localHeap.init(blockSize);
        Initialize();
    }

    //
    // Creates a new lockless queue.  The shared parameter provides another
    // lockless queue that provides the heap for nodes. Queues that
    // share the same node heap can share addresses. When a lockless queue
    // that is sharing another queue's nodes is destroyed, any outstanding
    // nodes in use are given back to the heap of the shared queue.  Destroying
    // the host queue before destroying queues sharing its heap results in 
    // undefined behavior.
    //
    LocklessQueue(_In_ LocklessQueue& shared) noexcept :
        m_localHeap(*this),
        m_heap(shared.m_heap),
        m_activeList(*this),
        m_blockCache(nullptr)
    {
        Initialize();
    }

    ~LocklessQueue() noexcept
    {
        if (&m_heap != &m_localHeap)
        {
            // This queue is sharing a heap, so
            // we need to put all outstanding nodes
            // back
            TData data;
            while(pop_front(data)) {}

            // And the dummy node too.
            Address dummy = m_activeList.head();
            Node* node = to_node(dummy);
            m_heap.free(node, dummy);
        }
    }

    //
    // Returns true if the queue is empty. Note that 
    // since there are no locks this is a snapshot in time.
    //
    bool empty() noexcept
    {
        return m_activeList.empty();
    }

    //
    // Reserves a link node in the queue, allocating it if
    // necessary.  The node must be used in a later push_back
    // or free_node call or it will be leaked.
    //
    bool reserve_node(_Out_ uint64_t& address) noexcept
    {
        Address a;
        Node* n = m_heap.alloc(a);

        if (n != nullptr)
        {
            address = a;
            return true;
        }
        else
        {
            address = 0;
            return false;
        }
    }

    //
    // Frees a previously reserved node.  Freeing a node
    // that has been pushed results in undefined behavior. It
    // will be hard to debug.
    //
    void free_node(_In_ uint64_t address) noexcept
    {
        Address a;
        a = address;
        Node* n = to_node(a);
        m_heap.free(n, a);
    }

    //
    // Pushes the given data onto the back
    // of the queue.  A copy is made of TData
    // (which may throw, if TData's copy constructor can
    // throw exceptions).
    // If the push fails this returns false.
    //
    bool push_back(_In_ const TData& data)
    {
        TData copy = data;
        return move_back(std::move(copy));
    }

    //
    // Pushes the given data onto the back
    // of the queue.
    //
    bool push_back(_In_ TData&& data) noexcept
    {
        return move_back(std::move(data));
    }

    //
    // Pushes the given data onto the back of the queue,
    // using a reserved node pointer.  This never fails as
    // the node pointer was preallocated. A copy of TData is
    // made (which may throw, if TData's copy constructor can
    // throw exceptions).
    //
    void push_back(_In_ const TData& data, _In_ uint64_t address)
    {
        TData copy = data;
        move_back(std::move(copy), address);
    }

    //
    // Pushes the given data onto the back of the queue,
    // using a reserved node pointer.  This never fails as
    // the node pointer was preallocated.
    //
    void push_back(_In_ TData&& data, _In_ uint64_t address) noexcept
    {
        move_back(std::move(data), address);
    }

    //
    // Pops TData off the head of the queue, returning
    // true if successful or false if the queue is
    // empty.
    //
    bool pop_front(_Out_ TData& data) noexcept
    {
        Address address;
        Node* node = m_activeList.pop(address);
        
        if (node != nullptr)
        {
            data = std::move(node->data);
            node->data = TData {};
            m_heap.free(node, address);
            return true;
        }
        
        return false;
    }

    //
    // Pops TData off the front of the queue, returning
    // true if successful, and returns a node address
    // instead of placing the address back on the free
    // list. This address can later be used in a push_back
    // call or a free_node call.  Failing to call either of these
    // results in a leaked node. Returns false if the queue
    // is empty.
    //
    bool pop_front(_Out_ TData& data, _Out_ uint64_t& address) noexcept
    {
        Address a;
        Node* node = m_activeList.pop(a);
        
        if (node != nullptr)
        {
            data = std::move(node->data);
            node->data = TData {};
            address = a;
            return true;
        }
        
        return false;
    }

    //
    // Removes items from the queue that satisfy the given callback.  The callback
    // is of the form:
    //
    //      bool callback(TData& data, uint64_t address);
    //
    // If the callback returns true, it is taking ownership of the data and the
    // address. If it returns false, the node is placed back on the queue.
    //
    // This is a lock-free call: if there are interleaving calls to push_back
    // while this action is in progress final node order is not guaranteed (nodes
    // this API processes may be interleaved with newly pushed nodes).
    //
    template <typename TCallback>
    void remove_if(TCallback callback)
    {
        LocklessQueue<TData> retain(*this);
        TData entry;
        uint64_t address;

        while (pop_front(entry, address))
        {
            if (!callback(entry, address))
            {
                retain.move_back(std::move(entry), address);
            }
        }

        while (retain.pop_front(entry, address))
        {
            move_back(std::move(entry), address);
        }
    }

private:

    /*
     *
     * Structure Definitions
     *
     */

    // Address - This represents the address of a node.
    // Nodes live in a contiguous memory block, and there are multiple
    // blocks. Address represents the position of the
    // node and must be 64 bits.

    struct Address
    {
        uint64_t index : 32;
        uint64_t block : 16;
        uint64_t aba   : 16;
        
        inline bool operator == (_In_ const Address& other) const
        {
            uint64_t v = *this;
            uint64_t ov = other;
            return v == ov;
        }

        inline bool operator != (_In_ const Address& other) const
        {
            uint64_t v = *this;
            uint64_t ov = other;
            return v != ov;
        }

        inline operator uint64_t () const
        {
            uint64_t v;

            // Note: this looks horribly inefficient.  General consensus
            // is this is the best way of doing type punning in a c++ compliant
            // way, and disassembly of this code shows it amounts to the following:
            //
            //      mov	rax, QWORD PTR [rdx]
            //
            // So, no real call out to memcpy for retail.

            memcpy(&v, this, sizeof(v));
            return v;
        }

        inline Address& operator = (_In_ uint64_t v)
        {
            memcpy(this, &v, sizeof(v));
            return *this;
        }
    };

    static_assert(sizeof(Address) == sizeof(uint64_t), "LocklessQueue Address field must be 64 bits exactly");

#if _MSVC_LANG >= 201703L
    static_assert(std::atomic<Address>::is_always_lock_free, "LocklessQueue requires atomic<Address> to be lock free");
#endif

    // Node - each entry in a list is backed by a node,
    // which contains a single 64 bit value representing
    // the next node of a singly linked list and
    // a payload of type TData. Nodes must be properly aligned
    // in memory so std::atomic works consistently.  We do this
    // by using an aligned allocator.
    struct Node
    {
        std::atomic<Address> next;
        TData data;
    };

    // Block - Represents a contiguous block of nodes. Blocks are
    // only created.  Ideally only one block would be created for
    // a particular use case of this queue, but if the queue runs
    // out of space in a block it will create additional blocks.
    // Blocks are linked together as a singly linked list.
    // Blocks must be properly aligned in memory so std::atomic
    // works consistently.  We do this by using an aligned allocator.
    struct Block
    {
        std::atomic<Block*> next;
        Node* nodes;
        uint32_t id;
        uint32_t padding;
    };

    // List - this is a fully lock free linked list with push and pop
    // operations.  Nodes are provided externally.
    class List
    {
    public:

        List(_In_ LocklessQueue& owner) :
            m_owner(owner)
        {}

        void init(_In_ Address dummy, _In_ Address end) noexcept
        {
            m_head = dummy;
            m_tail = dummy;
            m_end = end;
        }

        inline Address end() { return m_end; }
        inline Address head() { return m_head.load(); }

        // Returns true if the queue is empty. Note that 
        // since there are no locks this is a snapshot in time.
        bool empty() noexcept
        {
            Address head = m_head.load();
            Address tail = m_tail.load();
            Address next = m_owner.to_node(head)->next;
            
            if (head == m_head.load() &&
                head == tail &&
                next == m_end)
            {
                return true;
            }
            
            return false;
        }

        // Push a new node onto the tail of the list.  The address
        // is the address of the node.  The node next pointer is initialized
        // and the address aba is incremented.
        inline void push(_In_ Node* node, _In_ Address address) noexcept
        {
            node->next = m_end;
            address.aba++;
            push_range(address, address);
        }

        // Push a range of nodes into the tail of the list.  The tailAddress
        // is the last node to push and the beginAddress is the first.  The
        // nodes all need to be pre-confgured to follow each other.
        void push_range(_In_ Address beginAddress, _In_ Address tailAddress) noexcept
        {
            while(true)
            {
                Address tail = m_tail.load();
                Node* tailNode = m_owner.to_node(tail);
                Address tailNext = tailNode->next.load();
                
                if (tail == m_tail.load())
                {
                    if (tailNext == m_end)
                    {
                        // The next of the tail points to an invalid node, so
                        // this really is the tail.  Fix up the next pointer to
                        // point to our new node.  If this succeeds we try to
                        // adjust the tail, which isn't guaranteed to succeed.
                        // That's OK, we can fix it up later.

                        if (tailNode->next.compare_exchange_strong(tailNext, beginAddress))
                        {
                            m_tail.compare_exchange_strong(tail, tailAddress);
                            break;
                        }
                    }
                    else
                    {
                        // What we thought was the tail is really not pointing to the
                        // end, so advance down the list.

                        m_tail.compare_exchange_strong(tail, tailNext);
                    }
                }
            }
        }

        // Pop a node from the head of the list.  There is always a dummy
        // node at the head of the list, so part of this process shifts
        // data from head->next out to a temporary variable and then
        // puts the data back in head once it is detached. Returns nullptr
        // if the list is empty.
        Node* pop(_Out_ Address& address) noexcept
        {
            while (true)
            {
                Address head = m_head.load();
                Address tail = m_tail.load();
                Node* headNode = m_owner.to_node(head);
                Address next = headNode->next.load();

                if (head == m_head.load())
                {
                    if (head == tail)
                    {
                        if (next == m_end)
                        {
                            // List is empty
                            address = m_end;
                            return nullptr;
                        }

                        // List is not empty, but is out of
                        // sync. Advance the tail.

                        m_tail.compare_exchange_strong(tail, next);
                    }
                    else
                    {
                        // This is possibly the node we want.  We are going to
                        // shift the head node out of the list, but the list
                        // actually uses a dummy head node, so we must copy the
                        // data out of the next node, save it off, and then
                        // put it into the head node once we safely detach it.

                        TData data = m_owner.to_node(next)->data;

                        if (m_head.compare_exchange_strong(head, next))
                        {
                            headNode->data = std::move(data);
                            address = head;
                            return headNode;
                        }
                    }
                }
            }
        }

    private:

        LocklessQueue& m_owner;
        std::atomic<Address> m_head;
        std::atomic<Address> m_tail;
        Address m_end;
    };

    // Heap - the cache of available nodes. This heap may be shared
    // with other LocklessQueue instances in cases where you need to move
    // items between lists and want to share the nodes.  If you don't
    // also share the heap, sharing nodes will corrupt the queue.
    class Heap
    {
    public:
        Heap(_In_ LocklessQueue& owner) :
            m_freeList(owner)
        {}

        ~Heap()
        {
            Block* block = m_blockList;
            while(block != nullptr)
            {
                Block* d = block;
                block = block->next;

                for (uint32_t idx = 0; idx < m_blockSize; idx++)
                {
                    d->nodes[idx].~Node();
                }

                aligned_free(d);
            }
        }

        void init(_In_ uint32_t blockSize) noexcept
        {
            if (blockSize < 0x40)
            {
                blockSize = 0x40;
            }

            m_blockSize = blockSize;

            while(!allocate_block() && m_blockSize > 0x40)
            {
                m_blockSize = m_blockSize >> 2;
            }
        }

        inline Address end() { return m_freeList.end(); }

        // Returns the node at the given address, caching the block it
        // was in.
        Node* to_node(_Inout_ std::atomic<Block*>& blockCache, _In_ const Address& address)
        {
            Block* block = blockCache.load();

            if (block == nullptr || block->id != address.block)
            {
                for(block = m_blockList; block != nullptr; block = block->next)
                {
                    if (block->id == address.block)
                    {
                        blockCache = block;
                        break;
                    }
                }
            }

            return &block->nodes[address.index];
        }

        // Returns a node back to the heap
        void free(_In_ Node* node, _In_ Address address) noexcept
        {
            m_freeList.push(node, address);
        }

        // Pops a node off the heap, allocating a new
        // block if needed.
        Node* alloc(_Out_ Address& address) noexcept
        {
            Node* node;

            do
            {
                node = m_freeList.pop(address);

                // It's possible that pop_list fails right after
                // allocating a block if there is heavy free
                // list use (Ex another thread drains the free list
                // after we alocate).  So we must loop and possibly
                // allocate again.  We break out if allocation fails.

                if (node == nullptr && !allocate_block())
                {
                    break;
                }
            } while(node == nullptr);

            return node;
        }

    private:

        std::atomic<uint32_t> m_blockCount = { 0 };
        uint32_t m_blockSize = 0;
        Block* m_blockList = nullptr;
        List m_freeList;

        // Allocates a new block of m_blockSize and
        // puts all of its nodes on the free list. Returns
        // true if the allocation was successful.
        bool allocate_block() noexcept
        {
            uint32_t blockId = m_blockCount.fetch_add(1) + 1;

            // Block ID stored in Address is 16 bits, so that's our
            // max.
            if (blockId > 0xFFFF)
            {
                return false;
            }

            size_t size = sizeof(Node) * m_blockSize + sizeof(Block);
            void* mem = aligned_malloc(size, 8);

            if (mem == nullptr)
            {
                return false;
            }
            
            Block* block = new (mem) Block;

            block->id = blockId;        
            block->next = nullptr;
            block->nodes = new (block + 1) Node[m_blockSize];

            // Connect all the nodes in the new block. Element zero is
            // the "tail" of this block.
            
            Address prev = { 0 };
            for (uint32_t index = 0; index < m_blockSize; index++)
            {
                block->nodes[index].next = prev;
                prev.block = static_cast<uint16_t>(block->id);
                prev.index = index;
                prev.aba = 0;
            }

            // Now connect this block to the tail. Because we never delete
            // blocks we can safely traverse the linked list of blocks with
            // no locks or fanciness. The startIndex will be zero except when
            // we are first initializing m_blockList.  Then we steal the first
            // node to act as the free list's dummy node.

            uint32_t startIndex = 0;

            if (m_blockList == nullptr)
            {
                // Initial contruction.  We need to store the block list
                // and then initialize the free list.

                Address end = { 0 };
                Address a = { 0 };
                a.block = static_cast<uint16_t>(block->id);

                block->nodes[0].next = end;
                block->nodes[1].next = end;
                startIndex = 1;
                m_blockList = block;
                m_freeList.init(a, end);
            }
            else
            {
                Block* tail = m_blockList;
                Block* next = tail->next.load();

                while(true)
                {
                    while(next != nullptr)
                    {
                        tail = next;
                        next = tail->next.load();
                    }

                    Block* empty = nullptr;

                    if(tail->next.compare_exchange_strong(empty, block))
                    {
                        break;
                    }

                    next = tail->next.load();
                }
            }

            // Now add the tail and the head to the free list.

            Address rangeBegin { 0 };
            Address rangeEnd { 0 };

            rangeBegin.block = rangeEnd.block = static_cast<uint16_t>(block->id);
            rangeBegin.index = m_blockSize - 1;
            rangeEnd.index = startIndex;
            m_freeList.push_range(rangeBegin, rangeEnd);

            return true;
        }
    };

    /*
     *
     * Members
     *
     */
    
    Heap m_localHeap;
    Heap& m_heap;
    List m_activeList;
    std::atomic<Block*> m_blockCache;
    
    /*
     *
     * Private APIs
     * 
     */

    // Workers for pushing nodes to the back
    // of the queue.  This always moves data
    bool move_back(_In_ TData&& data) noexcept
    {
        Address address;
        Node* node = m_heap.alloc(address);

        if (node != nullptr)
        {
            node->data = std::move(data);
            m_activeList.push(node, address);
            return true;
        }

        return false;
    }

    // Workers for pushing nodes to the back
    // of the queue.  This always moves data
    void move_back(_In_ TData&& data, _In_ uint64_t address) noexcept
    {
        Address a;
        a = address;
        Node* n = to_node(a);
        n->data = std::move(data);
        m_activeList.push(n, a);
    }

    // Called during construction after the heap
    // is setup.
    void Initialize()
    {
        Address end = m_heap.end();
        end.index++;

        Address dummy;
        Node* n = m_heap.alloc(dummy);

        if (n != nullptr)
        {
            n->next = end;
        }
        else
        {
            dummy = end;
        }

        m_activeList.init(dummy, end);
    }

    Node* to_node(_In_ const Address& address)
    {
        return m_heap.to_node(m_blockCache, address);
    }
    
    static inline void* aligned_malloc(_In_ size_t size, _In_ size_t align)
    {
        void *result;
        size_t bytes = (size + align - 1) & ~(align - 1);
#ifdef _MSC_VER
        result = _aligned_malloc(bytes, align);
#else
        if(posix_memalign(&result, align, bytes)) result = 0;
#endif
        return result;
    }
    
    static inline void aligned_free(_In_ void *ptr)
    {
#ifdef _MSC_VER
        _aligned_free(ptr);
#else
        free(ptr);
#endif
    }
};