in pq/reader.go [203:272]
func (r *Reader) Next() (int, error) {
const op = "op/reader-next"
if err := r.canRead(); err != NoError {
return -1, r.errOf(op, err)
}
tx := r.tx
cursor := makeTxCursor(tx, r.accessor, &r.state.cursor)
r.adoptEventStats()
// in event? Skip contents
if r.state.eventBytes > 0 {
err := cursor.Skip(r.state.eventBytes)
if err != nil {
return 0, r.errWrap(op, err)
}
r.state.eventBytes = -1
r.state.id++
}
// end of buffered queue state. Update state and check if we did indeed reach
// the end of the queue.
if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
err := r.updateQueueState(tx)
if err != nil {
return 0, r.errWrap(op, err)
}
// end of queue
if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
return 0, nil
}
}
// Advance page and initialize cursor if event header does not fit into
// current page.
if cursor.PageBytes() < szEventHeader {
// cursor was not advanced by last read. The acker will not have deleted
// the current page -> try to advance now.
ok, err := cursor.AdvancePage()
if err != nil {
return 0, err
}
invariant.Check(ok, "page list linkage broken")
hdr, err := cursor.PageHeader()
if err != nil {
return 0, r.errWrap(op, err)
}
id := hdr.first.Get()
off := int(hdr.off.Get())
invariant.Check(r.state.id == id, "page start event id mismatch")
invariant.CheckNot(off == 0, "page event offset missing")
r.state.cursor.off = off
}
// Initialize next event read by determining event size.
hdr, err := cursor.ReadEventHeader()
if err != nil {
return 0, r.errWrap(op, err)
}
L := int(hdr.sz.Get())
r.state.eventBytes = L
r.state.totEventBytes = L
return L, nil
}