pq/reader.go (266 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pq
import (
"time"
"github.com/elastic/go-txfile"
"github.com/elastic/go-txfile/internal/invariant"
)
// Reader is used to iterate events stored in the queue.
type Reader struct {
accessor *access
state readState
active bool
tx *txfile.Tx
hdrOff uintptr
observer Observer
txStart time.Time
stats ReadStats
}
type readState struct {
id uint64
endID uint64 // id of next, yet unwritten event.
totEventBytes int // number of total bytes in current event
eventBytes int // number of unread bytes in current event
cursor cursor
}
func newReader(observer Observer, accessor *access) *Reader {
return &Reader{
active: true,
accessor: accessor,
observer: observer,
state: readState{
eventBytes: -1,
totEventBytes: -1,
cursor: cursor{
pageSize: accessor.PageSize(),
},
},
}
}
func (r *Reader) close() {
r.active = false
}
// Available returns the number of unread events that can be read.
func (r *Reader) Available() (uint, error) {
const op = "pq/reader-available"
if err := r.canRead(); err != NoError {
return 0, r.errOf(op, err)
}
tx := r.tx
err := r.updateQueueState(tx)
if err != nil {
return 0, r.errWrap(op, err)
}
if r.state.cursor.Nil() {
return 0, nil
}
return uint(r.state.endID - r.state.id), nil
}
// Begin starts a new read transaction, shared between multiple read calls.
// User must execute Done, to close the file transaction.
func (r *Reader) Begin() error {
const op = "pq/reader-begin"
var sig ErrKind = NoError
switch {
case r.isClosed():
sig = ReaderClosed
case r.isTxActive():
sig = UnexpectedActiveTx
}
if sig != NoError {
return r.errOf(op, sig)
}
tx, err := r.beginTx()
if err != nil {
return r.errWrap(op, err)
}
r.tx = tx
r.txStart = time.Now()
r.stats = ReadStats{} // zero out last stats on begin
return nil
}
// Done closes the active read transaction.
func (r *Reader) Done() {
if r.tx == nil {
return
}
r.tx.Close()
if r.state.eventBytes < 0 && r.state.totEventBytes > 0 {
// did read complete event -> adapt stats
r.adoptEventStats()
}
r.stats.Duration = time.Since(r.txStart)
if o := r.observer; o != nil {
o.OnQueueRead(r.hdrOff, r.stats)
}
r.tx = nil
}
// Read reads the contents of the current event into the buffer.
// Returns 0 without reading if end of the current event has been reached.
// Use `Next` to skip/continue reading the next event.
// If Begin is not been called before Read, a temporary read transaction is
// created.
func (r *Reader) Read(b []byte) (int, error) {
const op = "pq/read-event"
if err := r.canRead(); err != NoError {
return -1, r.errOf(op, err)
}
if r.state.eventBytes <= 0 {
return 0, nil
}
to, err := r.readInto(b)
n := len(b) - len(to)
if err != nil {
return n, r.errWrap(op, err)
}
return len(b) - len(to), nil
}
func (r *Reader) readInto(to []byte) ([]byte, reason) {
tx := r.tx
n := r.state.eventBytes
if L := len(to); L < n {
n = L
}
cursor := makeTxCursor(tx, r.accessor, &r.state.cursor)
for n > 0 {
consumed, err := cursor.Read(to[:n])
to = to[consumed:]
n -= consumed
r.state.eventBytes -= consumed
if err != nil {
return to, err
}
}
// end of event -> advance to next event
var err reason
if r.state.eventBytes == 0 {
r.state.eventBytes = -1
r.state.id++
// As page is already in memory, use current transaction to try to skip to
// next page if no more new event fits into current page.
if cursor.PageBytes() < szEventHeader {
_, err = cursor.AdvancePage()
}
}
return to, err
}
// Next advances to the next event to be read. The event size in bytes is
// returned. A size of 0 is reported if no more event is available in the
// queue.
// If Begin is not been called before Next, a temporary read transaction is
// created.
func (r *Reader) Next() (int, error) {
const op = "op/reader-next"
if err := r.canRead(); err != NoError {
return -1, r.errOf(op, err)
}
tx := r.tx
cursor := makeTxCursor(tx, r.accessor, &r.state.cursor)
r.adoptEventStats()
// in event? Skip contents
if r.state.eventBytes > 0 {
err := cursor.Skip(r.state.eventBytes)
if err != nil {
return 0, r.errWrap(op, err)
}
r.state.eventBytes = -1
r.state.id++
}
// end of buffered queue state. Update state and check if we did indeed reach
// the end of the queue.
if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
err := r.updateQueueState(tx)
if err != nil {
return 0, r.errWrap(op, err)
}
// end of queue
if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
return 0, nil
}
}
// Advance page and initialize cursor if event header does not fit into
// current page.
if cursor.PageBytes() < szEventHeader {
// cursor was not advanced by last read. The acker will not have deleted
// the current page -> try to advance now.
ok, err := cursor.AdvancePage()
if err != nil {
return 0, err
}
invariant.Check(ok, "page list linkage broken")
hdr, err := cursor.PageHeader()
if err != nil {
return 0, r.errWrap(op, err)
}
id := hdr.first.Get()
off := int(hdr.off.Get())
invariant.Check(r.state.id == id, "page start event id mismatch")
invariant.CheckNot(off == 0, "page event offset missing")
r.state.cursor.off = off
}
// Initialize next event read by determining event size.
hdr, err := cursor.ReadEventHeader()
if err != nil {
return 0, r.errWrap(op, err)
}
L := int(hdr.sz.Get())
r.state.eventBytes = L
r.state.totEventBytes = L
return L, nil
}
func (r *Reader) adoptEventStats() {
if r.state.totEventBytes < 0 {
// no active event
return
}
// update stats:
skipping := r.state.eventBytes > 0
if skipping {
r.stats.Skipped++
r.stats.BytesSkipped += uint(r.state.eventBytes)
r.stats.BytesTotal += uint(r.state.totEventBytes - r.state.eventBytes)
} else {
bytes := uint(r.state.totEventBytes)
r.stats.BytesTotal += bytes
if r.stats.Read == 0 {
r.stats.BytesMin = bytes
r.stats.BytesMax = bytes
} else {
if r.stats.BytesMin > bytes {
r.stats.BytesMin = bytes
}
if r.stats.BytesMax < bytes {
r.stats.BytesMax = bytes
}
}
r.stats.Read++
}
}
func (r *Reader) updateQueueState(tx *txfile.Tx) reason {
const op = "pq/reader-update-queue-state"
root, err := r.accessor.RootHdr(tx)
if err != nil {
return r.errWrap(op, err)
}
// Initialize cursor, if queue was empty on previous (without any pages).
if r.state.cursor.Nil() {
head := r.findReadStart(root)
tail := r.accessor.ParsePosition(&root.tail)
r.state.id = head.id
r.state.cursor.page = head.page
r.state.cursor.off = head.off
r.state.endID = tail.id
} else {
r.state.endID = root.tail.id.Get()
}
return nil
}
func (r *Reader) findReadStart(root *queuePage) position {
head := r.accessor.ParsePosition(&root.read)
if head.page != 0 {
return head
}
return r.accessor.ParsePosition(&root.head)
}
func (r *Reader) beginTx() (*txfile.Tx, reason) {
tx, err := r.accessor.BeginRead()
if err != nil {
return nil, r.errWrap("", err).report("failed to start read transaction")
}
return tx, nil
}
func (r *Reader) canRead() ErrKind {
if r.isClosed() {
return ReaderClosed
}
if !r.isTxActive() {
return InactiveTx
}
return NoError
}
func (r *Reader) isClosed() bool {
return !r.active
}
func (r *Reader) isTxActive() bool {
return r.tx != nil
}
func (r *Reader) err(op string) *Error {
return &Error{op: op, ctx: r.errCtx()}
}
func (r *Reader) errOf(op string, kind ErrKind) *Error {
return r.err(op).of(kind)
}
func (r *Reader) errWrap(op string, cause error) *Error {
return r.err(op).causedBy(cause)
}
func (r *Reader) errCtx() errorCtx {
return r.accessor.errCtx()
}