pq/pq.go (153 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pq import ( "fmt" "unsafe" "github.com/elastic/go-txfile" ) // Queue implements the on-disk queue data structure. The queue requires a // Delegate, so to start transactions at any time. The Queue provides a reader // and writer. While it is safe to use the Reader and Writer concurrently, the // Reader and Writer themselves are not thread-safe. type Queue struct { accessor access id queueID version uint32 hdrOffset uintptr // TODO: add support for multiple named readers with separate ACK handling. pagePool *pagePool reader *Reader writer *Writer acker *acker settings Settings } type queueID int type position struct { page txfile.PageID off int id uint64 } // Settings configures a queue when being instantiated with `New`. type Settings struct { // Queue write buffer size. If a single event is bigger then the // write-buffer, the write-buffer will grow. In this case will the write // buffer be flushed and reset to its original size. WriteBuffer uint // Optional Flushed callback. Will be used to notify n events being // successfully committed. Flushed func(n uint) // Optional ACK callback. Will be use to notify number of events being successfully // ACKed and pages being freed. ACKed func(event, pages uint) Observer Observer } // MakeRoot prepares the queue header (empty queue). // When creating a queue with `New`, the queue header must be available. // Still, a delegate is allowed to create the queue header lazily. func MakeRoot() [SzRoot]byte { var buf [SzRoot]byte qu := castQueueRootPage(buf[:]) qu.version.Set(queueVersion) return buf } // New creates a new Queue. The delegate is required to access the file and // start transactions. An error is returned if the delegate is nil, the queue // header is invalid, some settings are invalid, or if some IO error occurred. func New(delegate Delegate, settings Settings) (*Queue, error) { const op = "pq/new" if delegate == nil { return nil, errOp(op).of(InvalidParam).report("delegate must not be nil") } accessor, errKind := makeAccess(delegate) if errKind != NoError { return nil, errOp(op).of(errKind) } pageSize := delegate.PageSize() q := &Queue{ accessor: accessor, settings: settings, pagePool: newPagePool(pageSize), } // use pointer address as ID for correlating error messages q.id = queueID(uintptr(unsafe.Pointer(q))) accessor.quID = q.id rootBuf, err := q.accessor.ReadRoot() if err != nil { return nil, wrapErr(op, err).of(InitFailed). report("failed to read queue header") } root := castQueueRootPage(rootBuf[:]) if root.version.Get() != queueVersion { cause := &Error{ kind: InitFailed, msg: fmt.Sprintf("queue version %v", root.version.Get()), } return nil, wrapErr(op, cause).of(InitFailed) } tracef("open queue: %p (pageSize: %v)\n", q, pageSize) traceQueueHeader(root) q.version = root.version.Get() q.hdrOffset = q.accessor.RootFileOffset() q.onInit() return q, nil } func (q *Queue) onInit() { o := q.settings.Observer if o == nil { return } avail, _ := q.Active() o.OnQueueInit(q.hdrOffset, q.version, avail) } // Close will try to flush the current write buffer, // but after closing the queue, no more reads or writes can be executed func (q *Queue) Close() error { tracef("close queue %p\n", q) defer tracef("queue %p closed\n", q) if q.reader != nil { q.reader.close() q.reader = nil } if q.acker != nil { q.acker.close() q.acker = nil } var err error if q.writer != nil { err = q.writer.close() q.writer = nil } return err } // Pending returns the total number of enqueued, but unacked events. func (q *Queue) Pending() (int, error) { tx, err := q.accessor.BeginRead() if err != nil { return -1, err } defer tx.Close() hdr, err := q.accessor.RootHdr(tx) if err != nil { return -1, err } head := q.accessor.ParsePosition(&hdr.read) if head.page == 0 { head = q.accessor.ParsePosition(&hdr.head) } tail := q.accessor.ParsePosition(&hdr.tail) return int(tail.id - head.id), nil } // Writer returns the queue writer for inserting new events into the queue. // A queue has only one single writer instance, which is returned by GetWriter. // The writer is is not thread safe. func (q *Queue) Writer() (*Writer, error) { const op = "pq/get-writer" if q.writer != nil { return q.writer, nil } rootBuf, err := q.accessor.ReadRoot() if err != nil { return nil, q.accessor.errWrap(op, err) } root := castQueueRootPage(rootBuf[:]) tail := q.accessor.ParsePosition(&root.tail) writeBuffer := q.settings.WriteBuffer flushed := q.settings.Flushed writer, err := newWriter(&q.accessor, q.hdrOffset, q.settings.Observer, q.pagePool, writeBuffer, tail, flushed) if err != nil { return nil, q.accessor.errWrap(op, err) } q.writer = writer return q.writer, nil } // Reader returns the queue reader for reading a new events from the queue. // A queue has only one single reader instance. // The reader is not thread safe. func (q *Queue) Reader() *Reader { if q.reader == nil { q.reader = newReader(q.settings.Observer, &q.accessor) } return q.reader } // ACK signals the queue, the most n events at the front of the queue have been // processed. // The queue will try to remove these asynchronously. func (q *Queue) ACK(n uint) error { return q.getAcker().handle(n) } // Active returns the number of active, not yet ACKed events. func (q *Queue) Active() (uint, error) { return q.getAcker().Active() } func (q *Queue) getAcker() *acker { if q.acker == nil { q.acker = newAcker(&q.accessor, q.hdrOffset, q.settings.Observer, q.settings.ACKed) } return q.acker }