pq/cursor.go (166 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pq import "github.com/elastic/go-txfile" // cursor holds state for iterating events in the queue. type cursor struct { page txfile.PageID off int pageSize int } // txCursor is used to advance a cursor within a transaction. type txCursor struct { *cursor accessor *access tx *txfile.Tx page *txfile.Page } // Nil checks if the cursor is pointing to a page. Returns true, if cursor is // not pointing to any page in the queue. func (c *cursor) Nil() bool { return c.page == 0 } func makeTxCursor(tx *txfile.Tx, accessor *access, cursor *cursor) txCursor { return txCursor{ tx: tx, accessor: accessor, page: nil, cursor: cursor, } } func (c *txCursor) init() reason { const op = "pq/cursor-init" if c.page != nil { return nil } page, err := c.tx.Page(c.cursor.page) if err != nil { return c.errWrap(op, err) } c.page = page return nil } // Read reads more bytes from the current event into b. If the end of the // current event has reached, no bytes will be read. func (c *txCursor) Read(b []byte) (int, reason) { const op = "pq/read-bytes" if err := c.init(); err != nil { return 0, c.errWrap(op, err) } if c.Nil() { return 0, nil } to, err := c.readInto(b) n := len(b) - len(to) if err != nil { err = c.errWrap(op, err) } return n, err } // Skip skips the next n bytes. func (c *txCursor) Skip(n int) reason { const op = "pq/skip" for n > 0 { if c.PageBytes() == 0 { ok, err := c.AdvancePage() if err != nil { return c.errWrap(op, err).of(SeekFail) } if !ok { return c.err(op).report("No page to seek to") } } max := n if L := c.PageBytes(); L < max { max = L } c.cursor.off += max n -= max } return nil } func (c *txCursor) readInto(to []byte) ([]byte, reason) { for len(to) > 0 { // try to advance cursor to next page if last read did end at end of page if c.PageBytes() == 0 { ok, err := c.AdvancePage() if !ok || err != nil { return to, err } } var n int err := c.WithBytes(func(b []byte) { n = copy(to, b) }) to = to[n:] c.cursor.off += n if err != nil { return to, err } } return to, nil } func (c *txCursor) ReadEventHeader() (hdr *eventHeader, err reason) { const op = "pq/cursor-read-event-header" err = c.WithBytes(func(b []byte) { hdr = castEventHeader(b) c.off += szEventHeader }) if err != nil { err = c.errWrap(op, err) } return hdr, err } func (c *txCursor) PageHeader() (hdr *eventPage, err reason) { err = c.WithHdr(func(h *eventPage) { hdr = h }) return } func (c *txCursor) AdvancePage() (ok bool, err reason) { const op = "pq/cursor-next-page" err = c.WithHdr(func(hdr *eventPage) { nextID := txfile.PageID(hdr.next.Get()) tracef("advance page from %v -> %v\n", c.cursor.page, nextID) ok = nextID != 0 if ok { c.cursor.page = nextID c.cursor.off = szEventPageHeader c.page = nil } }) if err != nil { err = c.errWrap(op, err) } return ok, err } func (c *txCursor) WithPage(fn func([]byte)) reason { if err := c.init(); err != nil { return err } buf, err := c.page.Bytes() if err != nil { return c.errWrap("", err).of(ReadFail) } fn(buf) return nil } func (c *txCursor) WithHdr(fn func(*eventPage)) reason { const op = "pq/cursor-read-event-page-header" err := c.WithPage(func(b []byte) { fn(castEventPageHeader(b)) }) if err != nil { return c.errWrap(op, err) } return nil } func (c *txCursor) WithBytes(fn func([]byte)) reason { const op = "pq/cursor-access-page" err := c.WithPage(func(b []byte) { fn(b[c.off:]) }) if err != nil { return c.errWrap(op, err) } return nil } // PageBytes reports the amount of bytes still available in current page func (c *cursor) PageBytes() int { return c.pageSize - c.off } func (c *cursor) Reset() { *c = cursor{} } func (c *txCursor) err(op string) *Error { return &Error{op: op, ctx: c.errCtx(c.cursor.page)} } func (c *txCursor) errWrap(op string, cause error) *Error { return c.err(op).causedBy(cause) } func (c *txCursor) errCtx(page txfile.PageID) errorCtx { return c.accessor.errPageCtx(page) }