pq/writer.go (336 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pq
import (
"time"
"github.com/elastic/go-txfile"
"github.com/elastic/go-txfile/internal/cleanup"
"github.com/elastic/go-txfile/internal/invariant"
"github.com/elastic/go-txfile/txerr"
)
// Writer is used to push new events onto the queue.
// The writer uses a write buffer, which is flushed once the buffer is full
// or if Flush is called.
// Only complete events are flushed. If an event is bigger then the configured write buffer,
// the write buffer will grow with the event size.
type Writer struct {
active bool
hdrOffset uintptr
observer Observer
accessor *access
flushCB func(uint)
state writeState
}
type writeState struct {
buf *buffer
activeEventCount uint // count number of finished events since last flush
totalEventCount uint
totalAllocPages uint
eventID uint64
eventBytes int
activeEventBytes uint
minEventSize uint
maxEventSize uint
tsOldest, tsNewest time.Time
}
const defaultMinPages = 5
func newWriter(
accessor *access,
off uintptr,
o Observer,
pagePool *pagePool,
writeBuffer uint,
end position,
flushCB func(uint),
) (*Writer, reason) {
const op = "pq/create-writer"
pageSize := accessor.PageSize()
if pageSize <= 0 {
return nil, &Error{op: op, kind: InvalidPageSize}
}
pages := int(writeBuffer) / pageSize
if pages <= defaultMinPages {
pages = defaultMinPages
}
tracef("create queue writer with initBufferSize=%v, actualBufferSize=%v, pageSize=%v, pages=%v\n",
writeBuffer, pageSize*pages, pageSize, pages)
var tail *page
if end.page != 0 {
traceln("writer load endpage: ", end)
page := end.page
off := end.off
var err reason
tail, err = accessor.readPageByID(pagePool, page)
if err != nil {
return nil, (&Error{op: op}).causedBy(err)
}
tail.Meta.EndOff = uint32(off)
}
w := &Writer{
active: true,
hdrOffset: off,
observer: o,
accessor: accessor,
state: writeState{
buf: newBuffer(pagePool, tail, pages, pageSize, szEventPageHeader),
eventID: end.id,
},
flushCB: flushCB,
}
// init buffer with 'first' event to be written
w.state.buf.ReserveHdr(szEventHeader)
return w, nil
}
func (w *Writer) close() error {
const op = "pq/writer-close"
if !w.active {
return nil
}
err := w.flushBuffer()
if err != nil {
return w.errWrap(op, err)
}
w.active = false
w.state.buf = nil
return err
}
func (w *Writer) Write(p []byte) (int, error) {
const op = "pq/write"
if err := w.canWrite(); err != NoError {
return 0, w.errOf(op, err)
}
if w.state.buf.Avail() <= len(p) {
if err := w.flushBuffer(); err != nil {
return 0, w.errWrap(op, err)
}
}
w.state.buf.Append(p)
w.state.eventBytes += len(p)
return len(p), nil
}
// Next is used to indicate the end of the current event.
// If write is used with a streaming encoder, the buffers
// of the actual writer must be flushed before calling Next on this writer.
// Upon next, the queue writer will add the event framing header and footer.
func (w *Writer) Next() error {
const op = "pq/writer-next"
if err := w.canWrite(); err != NoError {
return w.errOf(op, err)
}
// finalize current event in buffer and prepare next event
hdr := castEventHeader(w.state.buf.ActiveEventHdr())
hdr.sz.Set(uint32(w.state.eventBytes))
w.state.buf.CommitEvent(w.state.eventID)
w.state.buf.ReserveHdr(szEventHeader)
sz := uint(w.state.eventBytes)
ts := time.Now()
w.state.activeEventBytes += sz
if w.state.activeEventCount == 0 {
w.state.minEventSize = sz
w.state.maxEventSize = sz
w.state.tsOldest = ts
w.state.tsNewest = ts
} else {
if sz < w.state.minEventSize {
w.state.minEventSize = sz
}
if sz > w.state.maxEventSize {
w.state.maxEventSize = sz
}
w.state.tsNewest = ts
}
w.state.eventBytes = 0
w.state.eventID++
w.state.activeEventCount++
// check if we need to flush
if w.state.buf.Avail() <= szEventHeader {
if err := w.flushBuffer(); err != nil {
return w.errWrap(op, err)
}
}
return nil
}
// Flush flushes the write buffer. Returns an error if the queue is closed,
// some error occurred or no more space is available in the file.
func (w *Writer) Flush() error {
const op = "pq/writer-flush"
if err := w.canWrite(); err != NoError {
return w.errOf(op, err)
}
if err := w.flushBuffer(); err != nil {
return w.errWrap(op, err)
}
return nil
}
func (w *Writer) flushBuffer() error {
activeEventCount := w.state.activeEventCount
start := time.Now()
pages, allocated, err := w.doFlush()
if o := w.observer; o != nil {
failed := err != nil
o.OnQueueFlush(w.hdrOffset, FlushStats{
Duration: time.Since(start),
Oldest: w.state.tsOldest,
Newest: w.state.tsNewest,
Failed: failed,
OutOfMemory: failed && (txerr.Is(txfile.OutOfMemory, err) ||
txerr.Is(txfile.NoDiskSpace, err)),
Pages: pages,
Allocate: allocated,
Events: activeEventCount,
BytesTotal: w.state.activeEventBytes,
BytesMin: w.state.minEventSize,
BytesMax: w.state.maxEventSize,
})
}
if err != nil {
return err
}
// reset internal stats on success
w.state.totalEventCount += activeEventCount
w.state.totalAllocPages += allocated
traceln("Write buffer flushed. Total events: %v, total pages allocated: %v",
w.state.totalEventCount,
w.state.totalAllocPages)
w.state.activeEventCount = 0
w.state.activeEventBytes = 0
w.state.minEventSize = 0
w.state.maxEventSize = 0
if w.flushCB != nil {
w.flushCB(activeEventCount)
}
return nil
}
func (w *Writer) doFlush() (pages, allocated uint, err error) {
start, end, pages := w.state.buf.Pages()
if start == nil || start == end {
return 0, 0, nil
}
traceln("writer flush", w.state.activeEventCount)
tracef("flush page range: start=%p, end=%p, n=%v\n", start, end, pages)
// unallocated points to first page in list that must be allocated. All
// pages between unallocated and end require a new page to be allocated.
var unallocated *page
allocated = pages
for current := start; current != end; current = current.Next {
tracef("check page assigned: %p (%v)\n", current, current.Assigned())
if !current.Assigned() {
unallocated = current
break
}
allocated--
}
tracef("start allocating pages from %p (n=%v)\n", unallocated, allocated)
tx, txErr := w.accessor.BeginWrite()
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
defer tx.Close()
rootPage, queueHdr, err := w.accessor.LoadRootPage(tx)
if err != nil {
return pages, allocated, w.errWrap("", err)
}
traceQueueHeader(queueHdr)
ok := false
allocN, txErr := allocatePages(tx, unallocated, end)
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
traceln("allocated pages:", allocN)
invariant.Checkf(allocN == allocated, "allocation counter mismatch (expected=%v, actual=%v)", allocated, allocN)
linkPages(start, end)
defer cleanup.IfNot(&ok, func() { unassignPages(unallocated, end) })
traceln("write queue pages")
last, txErr := flushPages(tx, start, end)
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
// update queue root
w.updateRootHdr(queueHdr, start, last, allocN)
rootPage.MarkDirty()
txErr = tx.Commit()
if txErr != nil {
return pages, allocated, w.errWrap("", txErr)
}
// mark write as success -> no error-cleanup required
ok = true
// remove dirty flag from all published pages
for current := start; current != end; current = current.Next {
current.UnmarkDirty()
}
w.state.buf.Reset(last)
return pages, allocated, nil
}
func (w *Writer) updateRootHdr(hdr *queuePage, start, last *page, allocated uint) {
if hdr.head.offset.Get() == 0 {
w.accessor.WritePosition(&hdr.head, position{
page: start.Meta.ID,
off: int(start.Meta.FirstOff),
id: start.Meta.FirstID,
})
}
hdr.inuse.Set(hdr.inuse.Get() + uint64(allocated))
endOff := int(last.Meta.EndOff)
if last == w.state.buf.eventHdrPage {
endOff = w.state.buf.eventHdrOffset
}
w.accessor.WritePosition(&hdr.tail, position{
page: last.Meta.ID,
off: endOff,
id: w.state.eventID,
})
traceln("writer: update queue header")
traceQueueHeader(hdr)
}
func (w *Writer) canWrite() ErrKind {
if !w.active {
return WriterClosed
}
return NoError
}
func (w *Writer) err(op string) *Error { return w.errPage(op, 0) }
func (w *Writer) errPage(op string, page txfile.PageID) *Error {
return &Error{op: op, ctx: w.errPageCtx(page)}
}
func (w *Writer) errOf(op string, kind ErrKind) *Error {
return w.err(op).of(kind)
}
func (w *Writer) errWrap(op string, cause error) *Error { return w.errWrapPage(op, cause, 0) }
func (w *Writer) errWrapPage(op string, cause error, page txfile.PageID) *Error {
return w.errPage(op, page).causedBy(cause)
}
func (w *Writer) errCtx() errorCtx { return w.errPageCtx(0) }
func (w *Writer) errPageCtx(id txfile.PageID) errorCtx {
return w.accessor.errPageCtx(id)
}
func allocatePages(tx *txfile.Tx, start, end *page) (uint, error) {
if start == nil {
return 0, nil
}
var allocN uint
for current := start; current != end; current = current.Next {
allocN++
}
tracef("allocate %v queue pages\n", allocN)
txPages, err := tx.AllocN(int(allocN))
if err != nil {
return 0, err
}
// assign new page IDs
for current, i := start, 0; current != end; current, i = current.Next, i+1 {
current.Meta.ID = txPages[i].ID()
}
return allocN, nil
}
// unassignPages removes page assignments from all pages between start and end,
// so to mark these pages as 'not allocated'.
func unassignPages(start, end *page) {
for current := start; current != end; current = current.Next {
current.Meta.ID = 0
}
}
// Update page headers to point to next page in the list.
func linkPages(start, end *page) {
for current := start; current.Next != end; current = current.Next {
tracef("link page %v -> %v\n", current.Meta.ID, current.Next.Meta.ID)
current.SetNext(current.Next.Meta.ID)
}
}
// flushPages flushes all pages in the list of pages and returns the last page
// being flushed.
func flushPages(tx *txfile.Tx, start, end *page) (*page, error) {
last := start
for current := start; current != end; current = current.Next {
last = current
err := flushPage(tx, current)
if err != nil {
return nil, err
}
}
return last, nil
}
func flushPage(tx *txfile.Tx, page *page) error {
page.UpdateHeader()
tracePageHeader(page.Meta.ID, castEventPageHeader(page.Data))
diskPage, err := tx.Page(page.Meta.ID)
if err != nil {
return err
}
err = diskPage.SetBytes(page.Data)
if err != nil {
return err
}
return diskPage.Flush()
}