internal/queue/queue.go (109 lines of code) (raw):

package queue import ( "container/ring" ) // Holder provides synchronized access to a *Queue[T]. type Holder[T any] struct { // these channels work in tandem to provide exclusive access to the underlying *Queue[T]. // each channel is created with a buffer size of one. // empty behaves like a mutex when there's one or more messages in the queue. // populated is like a semaphore when the queue is empty. // the *Queue[T] is only ever in one channel. which channel depends on if it contains any items. // the initial state is for empty to contain an empty queue. empty chan *Queue[T] populated chan *Queue[T] } // NewHolder creates a new Holder[T] that contains the provided *Queue[T]. func NewHolder[T any](q *Queue[T]) *Holder[T] { h := &Holder[T]{ empty: make(chan *Queue[T], 1), populated: make(chan *Queue[T], 1), } h.Release(q) return h } // Acquire attempts to acquire the *Queue[T]. If the *Queue[T] has already been acquired the call blocks. // When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition. func (h *Holder[T]) Acquire() *Queue[T] { // the queue will be in only one of the channels, it doesn't matter which one var q *Queue[T] select { case q = <-h.empty: // empty queue case q = <-h.populated: // populated queue } return q } // Wait returns a channel that's signaled when the *Queue[T] contains at least one item. // When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition. func (h *Holder[T]) Wait() <-chan *Queue[T] { return h.populated } // Release returns the *Queue[T] back to the Holder[T]. // Once the *Queue[T] has been released, it is no longer safe to call its methods. func (h *Holder[T]) Release(q *Queue[T]) { if q.Len() == 0 { h.empty <- q } else { h.populated <- q } } // Len returns the length of the *Queue[T]. func (h *Holder[T]) Len() int { msgLen := 0 select { case q := <-h.empty: h.empty <- q case q := <-h.populated: msgLen = q.Len() h.populated <- q } return msgLen } // Queue[T] is a segmented FIFO queue of Ts. type Queue[T any] struct { head *ring.Ring tail *ring.Ring size int } // New creates a new instance of Queue[T]. // - size is the size of each Queue segment func New[T any](size int) *Queue[T] { r := &ring.Ring{ Value: &segment[T]{ items: make([]*T, size), }, } return &Queue[T]{ head: r, tail: r, } } // Enqueue adds the specified item to the end of the queue. // If the current segment is full, a new segment is created. func (q *Queue[T]) Enqueue(item T) { for { r := q.tail seg := r.Value.(*segment[T]) if seg.tail < len(seg.items) { seg.items[seg.tail] = &item seg.tail++ q.size++ return } // segment is full, can we advance? if next := r.Next(); next != q.head { q.tail = next continue } // no, add a new ring r.Link(&ring.Ring{ Value: &segment[T]{ items: make([]*T, len(seg.items)), }, }) q.tail = r.Next() } } // Dequeue removes and returns the item from the front of the queue. func (q *Queue[T]) Dequeue() *T { r := q.head seg := r.Value.(*segment[T]) if seg.tail == 0 { // queue is empty return nil } // remove first item item := seg.items[seg.head] seg.items[seg.head] = nil seg.head++ q.size-- if seg.head == seg.tail { // segment is now empty, reset indices seg.head, seg.tail = 0, 0 // if we're not at the last ring, advance head to the next one if q.head != q.tail { q.head = r.Next() } } return item } // Len returns the total count of enqueued items. func (q *Queue[T]) Len() int { return q.size } type segment[T any] struct { items []*T head int tail int }