func()

in pq/ack.go [94:149]


func (a *acker) cleanup(n uint) (events uint, pages uint, err error) {
	const op = "pq/ack-cleanup"

	state, err := a.initACK(n)
	events, pages = n, uint(len(state.free))
	if err != nil {
		return events, pages, a.errWrap(op, err)
	}

	// start write transaction to free pages and update the next read offset in
	// the queue root
	tx, txErr := a.accessor.BeginCleanup()
	if txErr != nil {
		return events, pages, a.errWrap(op, txErr).report("failed to init cleanup tx")
	}
	defer tx.Close()

	traceln("acker: free data pages:", len(state.free))
	for _, id := range state.free {
		page, err := tx.Page(id)
		if err != nil {
			return events, pages, a.errWrapPage(op, err, id).report("can not access page to be freed")
		}

		traceln("free page", id)
		if err := page.Free(); err != nil {
			return events, pages, a.errWrapPage(op, err, id).report("releasing page failed")
		}
	}

	// update queue header
	hdrPage, hdr, err := a.accessor.LoadRootPage(tx)
	if err != nil {
		return events, pages, err
	}
	a.accessor.WritePosition(&hdr.head, state.head)
	a.accessor.WritePosition(&hdr.read, state.read)
	hdr.inuse.Set(hdr.inuse.Get() - uint64(len(state.free)))
	hdrPage.MarkDirty()

	traceQueueHeader(hdr)

	if err := tx.Commit(); err != nil {
		return events, pages, a.errWrap(op, err).report("failed to commit changes")
	}

	a.totalEventCount += n
	a.totalFreedPages += uint(len(state.free))
	tracef("Acked events. total events acked: %v, total pages freed: %v \n", a.totalEventCount, a.totalFreedPages)

	if a.ackCB != nil {
		a.ackCB(n, uint(len(state.free)))
	}

	return events, pages, nil
}