func()

in pq/reader.go [203:272]


func (r *Reader) Next() (int, error) {
	const op = "op/reader-next"

	if err := r.canRead(); err != NoError {
		return -1, r.errOf(op, err)
	}

	tx := r.tx
	cursor := makeTxCursor(tx, r.accessor, &r.state.cursor)

	r.adoptEventStats()

	// in event? Skip contents
	if r.state.eventBytes > 0 {
		err := cursor.Skip(r.state.eventBytes)
		if err != nil {
			return 0, r.errWrap(op, err)
		}

		r.state.eventBytes = -1
		r.state.id++
	}

	// end of buffered queue state. Update state and check if we did indeed reach
	// the end of the queue.
	if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
		err := r.updateQueueState(tx)
		if err != nil {
			return 0, r.errWrap(op, err)
		}

		// end of queue
		if cursor.Nil() || !idLess(r.state.id, r.state.endID) {
			return 0, nil
		}
	}

	// Advance page and initialize cursor if event header does not fit into
	// current page.
	if cursor.PageBytes() < szEventHeader {
		// cursor was not advanced by last read. The acker will not have deleted
		// the current page -> try to advance now.
		ok, err := cursor.AdvancePage()
		if err != nil {
			return 0, err
		}
		invariant.Check(ok, "page list linkage broken")

		hdr, err := cursor.PageHeader()
		if err != nil {
			return 0, r.errWrap(op, err)
		}

		id := hdr.first.Get()
		off := int(hdr.off.Get())
		invariant.Check(r.state.id == id, "page start event id mismatch")
		invariant.CheckNot(off == 0, "page event offset missing")
		r.state.cursor.off = off
	}

	// Initialize next event read by determining event size.
	hdr, err := cursor.ReadEventHeader()
	if err != nil {
		return 0, r.errWrap(op, err)
	}
	L := int(hdr.sz.Get())
	r.state.eventBytes = L
	r.state.totEventBytes = L
	return L, nil
}