func()

in pq/ack.go [153:208]


func (a *acker) initACK(n uint) (ackState, reason) {
	const op = "pq/ack-precompute"

	tx, txErr := a.accessor.BeginRead()
	if txErr != nil {
		return ackState{}, a.errWrap(op, txErr)
	}
	defer tx.Close()

	hdr, err := a.accessor.RootHdr(tx)
	if err != nil {
		return ackState{}, err
	}

	headPos, startPos, endPos := a.queueRange(hdr)
	startID := startPos.id
	endID := startID + uint64(n)
	if startPos.page == 0 {
		return ackState{}, a.err(op).of(ACKEmptyQueue)
	}
	if !idLessEq(endID, endPos.id) {
		return ackState{}, a.err(op).of(ACKTooMany)
	}

	c := makeTxCursor(tx, a.accessor, &cursor{
		page:     headPos.page,
		off:      headPos.off,
		pageSize: a.accessor.PageSize(),
	})

	// Advance through pages and collect ids of all pages to be freed.
	// Free all pages, but the very last data page, so to not interfere with
	// concurrent writes.
	ids, cleanAll, err := a.collectFreePages(&c, endID)
	if err != nil {
		return ackState{}, a.errWrap(op, err)
	}

	// find offset of next event to start reading from
	var head, read position
	if !cleanAll {
		head, read, err = a.findNewStartPositions(&c, endID)
		if err != nil {
			return ackState{}, a.errWrap(op, err)
		}
	} else {
		head = endPos
		read = endPos
	}

	return ackState{
		free: ids,
		head: head,
		read: read,
	}, nil
}