in pq/ack.go [94:149]
func (a *acker) cleanup(n uint) (events uint, pages uint, err error) {
const op = "pq/ack-cleanup"
state, err := a.initACK(n)
events, pages = n, uint(len(state.free))
if err != nil {
return events, pages, a.errWrap(op, err)
}
// start write transaction to free pages and update the next read offset in
// the queue root
tx, txErr := a.accessor.BeginCleanup()
if txErr != nil {
return events, pages, a.errWrap(op, txErr).report("failed to init cleanup tx")
}
defer tx.Close()
traceln("acker: free data pages:", len(state.free))
for _, id := range state.free {
page, err := tx.Page(id)
if err != nil {
return events, pages, a.errWrapPage(op, err, id).report("can not access page to be freed")
}
traceln("free page", id)
if err := page.Free(); err != nil {
return events, pages, a.errWrapPage(op, err, id).report("releasing page failed")
}
}
// update queue header
hdrPage, hdr, err := a.accessor.LoadRootPage(tx)
if err != nil {
return events, pages, err
}
a.accessor.WritePosition(&hdr.head, state.head)
a.accessor.WritePosition(&hdr.read, state.read)
hdr.inuse.Set(hdr.inuse.Get() - uint64(len(state.free)))
hdrPage.MarkDirty()
traceQueueHeader(hdr)
if err := tx.Commit(); err != nil {
return events, pages, a.errWrap(op, err).report("failed to commit changes")
}
a.totalEventCount += n
a.totalFreedPages += uint(len(state.free))
tracef("Acked events. total events acked: %v, total pages freed: %v \n", a.totalEventCount, a.totalFreedPages)
if a.ackCB != nil {
a.ackCB(n, uint(len(state.free)))
}
return events, pages, nil
}