pq/ack.go (258 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pq
import (
"time"
"github.com/elastic/go-txfile"
"github.com/elastic/go-txfile/internal/invariant"
)
// acker is used to asynchronously ack and remove events from the queue.
type acker struct {
accessor *access
active bool
hdrOffset uintptr
observer Observer
totalEventCount uint
totalFreedPages uint
ackCB func(events, pages uint)
}
// ackState records the changes required to finish the ACK step.
type ackState struct {
free []txfile.PageID // Collect page ids to be freed.
head position // New queue head, pointing to first event in first available page
read position // New on-disk read pointer, pointing to first not-yet ACKed event.
}
func newAcker(accessor *access, off uintptr, o Observer, cb func(uint, uint)) *acker {
return &acker{
hdrOffset: off,
observer: o,
active: true,
accessor: accessor,
ackCB: cb,
}
}
func (a *acker) close() {
a.active = false
}
// handle processes an ACK by freeing pages and
// updating the head and read positions in the queue root.
// So to not interfere with concurrent readers potentially updating pointers
// or adding new contents to a page, the last event page in the queue will never
// be freed. Still the read pointer might point past the last page.
func (a *acker) handle(n uint) error {
const op = "pq/ack"
if n == 0 {
return nil
}
if !a.active {
return a.err(op).of(QueueClosed)
}
traceln("acker: pq ack events:", n)
start := time.Now()
events, pages, err := a.cleanup(n)
if o := a.observer; o != nil {
failed := err != nil
o.OnQueueACK(a.hdrOffset, ACKStats{
Duration: time.Since(start),
Failed: failed,
Events: events,
Pages: pages,
})
}
return err
}
func (a *acker) cleanup(n uint) (events uint, pages uint, err error) {
const op = "pq/ack-cleanup"
state, err := a.initACK(n)
events, pages = n, uint(len(state.free))
if err != nil {
return events, pages, a.errWrap(op, err)
}
// start write transaction to free pages and update the next read offset in
// the queue root
tx, txErr := a.accessor.BeginCleanup()
if txErr != nil {
return events, pages, a.errWrap(op, txErr).report("failed to init cleanup tx")
}
defer tx.Close()
traceln("acker: free data pages:", len(state.free))
for _, id := range state.free {
page, err := tx.Page(id)
if err != nil {
return events, pages, a.errWrapPage(op, err, id).report("can not access page to be freed")
}
traceln("free page", id)
if err := page.Free(); err != nil {
return events, pages, a.errWrapPage(op, err, id).report("releasing page failed")
}
}
// update queue header
hdrPage, hdr, err := a.accessor.LoadRootPage(tx)
if err != nil {
return events, pages, err
}
a.accessor.WritePosition(&hdr.head, state.head)
a.accessor.WritePosition(&hdr.read, state.read)
hdr.inuse.Set(hdr.inuse.Get() - uint64(len(state.free)))
hdrPage.MarkDirty()
traceQueueHeader(hdr)
if err := tx.Commit(); err != nil {
return events, pages, a.errWrap(op, err).report("failed to commit changes")
}
a.totalEventCount += n
a.totalFreedPages += uint(len(state.free))
tracef("Acked events. total events acked: %v, total pages freed: %v \n", a.totalEventCount, a.totalFreedPages)
if a.ackCB != nil {
a.ackCB(n, uint(len(state.free)))
}
return events, pages, nil
}
// initACK uses a read-transaction to collect pages to be removed from list and
// find offset of next read required to start reading the next un-acked event.
func (a *acker) initACK(n uint) (ackState, reason) {
const op = "pq/ack-precompute"
tx, txErr := a.accessor.BeginRead()
if txErr != nil {
return ackState{}, a.errWrap(op, txErr)
}
defer tx.Close()
hdr, err := a.accessor.RootHdr(tx)
if err != nil {
return ackState{}, err
}
headPos, startPos, endPos := a.queueRange(hdr)
startID := startPos.id
endID := startID + uint64(n)
if startPos.page == 0 {
return ackState{}, a.err(op).of(ACKEmptyQueue)
}
if !idLessEq(endID, endPos.id) {
return ackState{}, a.err(op).of(ACKTooMany)
}
c := makeTxCursor(tx, a.accessor, &cursor{
page: headPos.page,
off: headPos.off,
pageSize: a.accessor.PageSize(),
})
// Advance through pages and collect ids of all pages to be freed.
// Free all pages, but the very last data page, so to not interfere with
// concurrent writes.
ids, cleanAll, err := a.collectFreePages(&c, endID)
if err != nil {
return ackState{}, a.errWrap(op, err)
}
// find offset of next event to start reading from
var head, read position
if !cleanAll {
head, read, err = a.findNewStartPositions(&c, endID)
if err != nil {
return ackState{}, a.errWrap(op, err)
}
} else {
head = endPos
read = endPos
}
return ackState{
free: ids,
head: head,
read: read,
}, nil
}
// queueRange finds the start and end positions of not yet acked events in the
// queue.
func (a *acker) queueRange(hdr *queuePage) (head, start, end position) {
start = a.accessor.ParsePosition(&hdr.read)
head = a.accessor.ParsePosition(&hdr.head)
if start.page == 0 {
start = head
}
end = a.accessor.ParsePosition(&hdr.tail)
return
}
// collectFreePages collects all pages to be freed. A page can be freed if all
// events within the page have been acked. We want to free all pages, but the
// very last data page, so to not interfere with concurrent writes.
// All pages up to endID will be collected.
func (a *acker) collectFreePages(c *txCursor, endID uint64) ([]txfile.PageID, bool, reason) {
const op = "pq/collect-acked-pages"
var (
ids []txfile.PageID
lastID uint64
cleanAll = false
)
for {
hdr, err := c.PageHeader()
if err != nil {
return nil, false, a.errWrap(op, err)
}
next := hdr.next.Get()
// stop searching if current page is the last page. The last page must
// be active for the writer to add more events and link new pages.
isWritePage := next == 0
// stop searching if endID is in the current write page
dataOnlyPage := hdr.off.Get() == 0 // no event starts within this page
if !dataOnlyPage {
lastID = hdr.last.Get()
// inc 'lastID', so to hold on current page if endID would point to next
// the page. This helps the reader, potentially pointing to the current
// page, if next page has not been committed when reading events.
lastID++
// remove page if endID points past current data page
keepPage := isWritePage || idLessEq(endID, lastID)
if keepPage {
break
}
}
if isWritePage {
cleanAll = true
invariant.Checkf(lastID+1 == endID, "last event ID (%v) and ack event id (%v) missmatch", lastID, endID)
break
}
// found intermediate page with ACKed events/contents
// -> add page id to freelist and advance to next page
ids = append(ids, c.cursor.page)
ok, err := c.AdvancePage()
if err != nil {
return nil, false, a.errWrap(op, err)
}
invariant.Check(ok, "page list linkage broken")
}
return ids, cleanAll, nil
}
// findNewStartPositions skips acked events, so to find the new head and read pointers to be set
// in the updated queue header.
func (a *acker) findNewStartPositions(c *txCursor, id uint64) (head, read position, err reason) {
const op = "pq/ack-compute-new-start"
var hdr *eventPage
hdr, err = c.PageHeader()
if err != nil {
return head, read, a.errWrap(op, err)
}
head = position{
page: c.cursor.page,
off: int(hdr.off.Get()),
id: hdr.first.Get(),
}
if id == head.id {
read = head
return head, read, nil
}
// skip contents in current page until we did reach start of next event.
c.cursor.off = head.off
for currentID := head.id; currentID != id; currentID++ {
var evtHdr *eventHeader
evtHdr, err = c.ReadEventHeader()
if err != nil {
return head, read, a.errWrap(op, err)
}
err = c.Skip(int(evtHdr.sz.Get()))
if err != nil {
return head, read, a.errWrap(op, err)
}
}
read = position{
page: c.cursor.page,
off: c.cursor.off,
id: id,
}
return head, read, nil
}
// Active returns the total number of active, not yet ACKed events.
func (a *acker) Active() (uint, reason) {
const op = "pq/count-active-events"
tx, txErr := a.accessor.BeginRead()
if txErr != nil {
return 0, a.errWrap(op, txErr)
}
defer tx.Close()
hdr, err := a.accessor.RootHdr(tx)
if err != nil {
return 0, a.errWrap(op, err)
}
// Empty queue?
if hdr.tail.offset.Get() == 0 {
return 0, nil
}
var start, end uint64
end = hdr.tail.id.Get()
if hdr.read.offset.Get() != 0 {
start = hdr.read.id.Get()
} else {
start = hdr.head.id.Get()
}
return uint(end - start), nil
}
func (a *acker) err(op string) *Error { return a.errPage(op, 0) }
func (a *acker) errPage(op string, page txfile.PageID) *Error {
return &Error{op: op, ctx: a.errPageCtx(page)}
}
func (a *acker) errWrap(op string, cause error) *Error { return a.errWrapPage(op, cause, 0) }
func (a *acker) errWrapPage(op string, cause error, page txfile.PageID) *Error {
return a.errPage(op, page).causedBy(cause)
}
func (a *acker) errCtx() errorCtx { return a.accessor.errCtx() }
func (a *acker) errPageCtx(id txfile.PageID) errorCtx {
return a.accessor.errPageCtx(id)
}