in pq/ack.go [153:208]
func (a *acker) initACK(n uint) (ackState, reason) {
const op = "pq/ack-precompute"
tx, txErr := a.accessor.BeginRead()
if txErr != nil {
return ackState{}, a.errWrap(op, txErr)
}
defer tx.Close()
hdr, err := a.accessor.RootHdr(tx)
if err != nil {
return ackState{}, err
}
headPos, startPos, endPos := a.queueRange(hdr)
startID := startPos.id
endID := startID + uint64(n)
if startPos.page == 0 {
return ackState{}, a.err(op).of(ACKEmptyQueue)
}
if !idLessEq(endID, endPos.id) {
return ackState{}, a.err(op).of(ACKTooMany)
}
c := makeTxCursor(tx, a.accessor, &cursor{
page: headPos.page,
off: headPos.off,
pageSize: a.accessor.PageSize(),
})
// Advance through pages and collect ids of all pages to be freed.
// Free all pages, but the very last data page, so to not interfere with
// concurrent writes.
ids, cleanAll, err := a.collectFreePages(&c, endID)
if err != nil {
return ackState{}, a.errWrap(op, err)
}
// find offset of next event to start reading from
var head, read position
if !cleanAll {
head, read, err = a.findNewStartPositions(&c, endID)
if err != nil {
return ackState{}, a.errWrap(op, err)
}
} else {
head = endPos
read = endPos
}
return ackState{
free: ids,
head: head,
read: read,
}, nil
}