in pq/writer.go [269:344]
func (w *Writer) doFlush() (pages, allocated uint, err error) {
start, end, pages := w.state.buf.Pages()
if start == nil || start == end {
return 0, 0, nil
}
traceln("writer flush", w.state.activeEventCount)
tracef("flush page range: start=%p, end=%p, n=%v\n", start, end, pages)
// unallocated points to first page in list that must be allocated. All
// pages between unallocated and end require a new page to be allocated.
var unallocated *page
allocated = pages
for current := start; current != end; current = current.Next {
tracef("check page assigned: %p (%v)\n", current, current.Assigned())
if !current.Assigned() {
unallocated = current
break
}
allocated--
}
tracef("start allocating pages from %p (n=%v)\n", unallocated, allocated)
tx, txErr := w.accessor.BeginWrite()
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
defer tx.Close()
rootPage, queueHdr, err := w.accessor.LoadRootPage(tx)
if err != nil {
return pages, allocated, w.errWrap("", err)
}
traceQueueHeader(queueHdr)
ok := false
allocN, txErr := allocatePages(tx, unallocated, end)
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
traceln("allocated pages:", allocN)
invariant.Checkf(allocN == allocated, "allocation counter mismatch (expected=%v, actual=%v)", allocated, allocN)
linkPages(start, end)
defer cleanup.IfNot(&ok, func() { unassignPages(unallocated, end) })
traceln("write queue pages")
last, txErr := flushPages(tx, start, end)
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
// update queue root
w.updateRootHdr(queueHdr, start, last, allocN)
rootPage.MarkDirty()
txErr = tx.Commit()
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
// mark write as success -> no error-cleanup required
ok = true
// remove dirty flag from all published pages
for current := start; current != end; current = current.Next {
current.UnmarkDirty()
}
w.state.buf.Reset(last)
return pages, allocated, nil
}