pq/delegate.go (69 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pq import "github.com/elastic/go-txfile" // Delegate is used by the persistent queue to query common parameters and // start transactions when required. type Delegate interface { // PageSize reports the page size to be used by the backing file. PageSize() int // Root returns the queues root on file. Root() (txfile.PageID, uintptr) Offset(id txfile.PageID, offset uintptr) uintptr SplitOffset(uintptr) (txfile.PageID, uintptr) // BeginWrite must create a read-write transaction for use by the writer. // The transaction will be used to allocate pages and flush the current write // buffer. BeginWrite() (*txfile.Tx, error) // BeginRead must return a readonly transaction. BeginRead() (*txfile.Tx, error) // BeginCleanup must return a read-write transaction for the ACK handling to // remove events. No new contents will be written, but pages will be freed // and the queue root page being updated. BeginCleanup() (*txfile.Tx, error) } // standaloneDelegate wraps a txfile.File into a standalone queue only file. // The delegate sets the files root to the queue header. type standaloneDelegate struct { file *txfile.File root txfile.PageID } // NewStandaloneDelegate creates a standaonle Delegate from an txfile.File // instance. This function will allocate and initialize the queue root page. func NewStandaloneDelegate(f *txfile.File) (Delegate, error) { tx, err := f.Begin() if err != nil { return nil, err } defer tx.Close() root := tx.Root() if root == 0 { var err error root, err = initQueueRoot(tx) if err != nil { return nil, err } } return &standaloneDelegate{file: f, root: root}, nil } func initQueueRoot(tx *txfile.Tx) (txfile.PageID, error) { page, err := tx.Alloc() if err != nil { return 0, err } buf := MakeRoot() if err := page.SetBytes(buf[:]); err != nil { return 0, err } tx.SetRoot(page.ID()) return page.ID(), tx.Commit() } // PageSize returns the files page size. func (d *standaloneDelegate) PageSize() int { return d.file.PageSize() } // Root finds the queue root page and offset. func (d *standaloneDelegate) Root() (txfile.PageID, uintptr) { return d.root, 0 } func (d *standaloneDelegate) Offset(id txfile.PageID, offset uintptr) uintptr { return d.file.Offset(id, offset) } func (d *standaloneDelegate) SplitOffset(offset uintptr) (txfile.PageID, uintptr) { return d.file.SplitOffset(offset) } // BeginWrite creates a new transaction for flushing the write buffers to disk. func (d *standaloneDelegate) BeginWrite() (*txfile.Tx, error) { return d.file.BeginWith(txfile.TxOptions{ WALLimit: 3, }) } // BeginRead returns a readonly transaction. func (d *standaloneDelegate) BeginRead() (*txfile.Tx, error) { return d.file.BeginReadonly() } // BeginCleanup creates a new write transaction configured for cleaning up used // events/pages only. func (d *standaloneDelegate) BeginCleanup() (*txfile.Tx, error) { return d.file.BeginWith(txfile.TxOptions{ EnableOverflowArea: true, WALLimit: 3, }) }