pq/buffer.go (187 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package pq import "github.com/elastic/go-txfile/internal/invariant" // buffer holds allocated and yet unallocated in-memory pages, for appending // events to. type buffer struct { // in-memory pages head, tail *page pool *pagePool // settings (values don't change after init) pageSize int hdrSize int payloadSize int // effective page contents // page write state avail int // available space before forcing flush payload []byte // byte slice of available payload/bytes in the current page page *page // current page // Event write state. Stores reference to start of current events, so we can // put in the event header once the current event is finished. eventHdrPage *page eventHdrOffset int eventHdrSize int // stats countPages uint } func newBuffer(pool *pagePool, page *page, pages, pageSize, hdrSz int) *buffer { payloadSz := pageSize - hdrSz avail := payloadSz * pages tracef("init writer buffer with pages=%v, pageSize=%v, hdrSize=%v, avail=%v\n", pages, pageSize, hdrSz, avail) b := &buffer{ head: nil, tail: nil, pool: pool, pageSize: pageSize, hdrSize: hdrSz, payloadSize: payloadSz, avail: avail, payload: nil, page: nil, eventHdrPage: nil, eventHdrOffset: -1, eventHdrSize: -1, } if page != nil { // init with end of on-disk list from former writes b.head = page b.tail = page contentsLength := int(page.Meta.EndOff) - b.hdrSize b.avail -= contentsLength b.payload = page.Data[page.Meta.EndOff:] b.page = page b.countPages++ } return b } // Avail returns amount of bytes available. Returns a value <0, if contents in // buffer exceeds the high-water-marks. func (b *buffer) Avail() int { return b.avail } // Append adds more bytes to the current event. Use `CommitEvent` to finalize the // writing of the current event. // If required append adds new unallocated pages to the write buffer. func (b *buffer) Append(data []byte) { for len(data) > 0 { if len(b.payload) == 0 { b.advancePage() } n := copy(b.payload, data) b.payload = b.payload[n:] data = data[n:] b.avail -= n tracef("writer: append %v bytes to (page: %v, off: %v, avail: %v)\n", n, b.page.Meta.ID, b.page.Meta.EndOff, b.avail) b.page.Meta.EndOff += uint32(n) } } func (b *buffer) advancePage() { // link new page into list page := b.newPage() if b.tail == nil { b.head = page b.tail = page } else { b.tail.Next = page b.tail = page } b.page = page b.payload = page.Payload() page.Meta.EndOff = uint32(szEventPageHeader) } func (b *buffer) newPage() *page { b.countPages++ return b.pool.NewPage() } func (b *buffer) releasePage(p *page) { b.countPages-- b.pool.Release(p) } // ReserveHdr reserves space for the next event header in the write buffer. // The start position in the buffer is tracked by the buffer, until the event is // finished via CommitEvent. func (b *buffer) ReserveHdr(n int) []byte { if n > b.payloadSize { return nil } invariant.Check(b.eventHdrPage == nil, "can not reserve a new event header if recent event is not finished yet") // reserve n bytes in payload if len(b.payload) < n { b.advancePage() } payloadWritten := b.payloadSize - len(b.payload) b.eventHdrPage = b.page b.eventHdrPage.Meta.EndOff += uint32(n) b.eventHdrOffset = b.hdrSize + payloadWritten b.eventHdrSize = n b.payload = b.payload[n:] b.avail -= n return b.ActiveEventHdr() } // ActiveEventHdr returns the current event header bytes content for writing/reading. func (b *buffer) ActiveEventHdr() []byte { if b.eventHdrPage == nil { return nil } off := b.eventHdrOffset return b.eventHdrPage.Data[off : off+b.eventHdrSize] } // CommitEvent marks the current event being finished. Finalize pages // and prepare for next event. func (b *buffer) CommitEvent(id uint64) { invariant.Check(b.eventHdrPage != nil, "no active event") page := b.eventHdrPage meta := &page.Meta if meta.FirstOff == 0 { meta.FirstOff = uint32(b.eventHdrOffset) meta.FirstID = id } meta.LastID = id page.MarkDirty() // mark all event pages as dirty for current := b.eventHdrPage; current != nil; current = current.Next { current.MarkDirty() } // mark head as dirty if yet unlinked if b.head != b.eventHdrPage && b.head.Next == b.eventHdrPage { b.head.MarkDirty() } b.eventHdrPage = nil b.eventHdrOffset = -1 b.eventHdrSize = -1 } // Pages returns start and end page to be serialized. // The `end` page must not be serialized func (b *buffer) Pages() (start, end *page, n uint) { traceln("get buffer active page range") if b.head == nil || !b.head.Dirty() { traceln("buffer empty") return nil, nil, 0 } if b.eventHdrPage == nil { traceln("no active page") if b.tail.Dirty() { traceln("tail is dirty") return b.head, nil, b.countPages } traceln("tail is not dirty") for current := b.head; current != nil; current = current.Next { if !current.Dirty() { return b.head, current, n } n++ } invariant.Unreachable("tail if list dirty and not dirty?") } end = b.eventHdrPage n = b.countPages if end.Dirty() { traceln("active page is dirty") end = end.Next } else { traceln("active page is clean") n-- } return b.head, end, n } // Reset removes all but the last page non-dirty page from the buffer. // The last written page is still required for writing/linking new events/pages. func (b *buffer) Reset(last *page) { if b.head == nil { return } // Find last page not to be removed. A non-dirty page must not be removed // if the next page is dirty, so to update the on-disk link. // If no page is dirty, keep last page for linking. pages := 0 end := b.head for current := b.head; current.Next != nil && current != b.eventHdrPage; current = current.Next { if current.Next.Dirty() || current == last { end = current break } end = current.Next pages++ } tracef("reset pages (%v)\n", pages) invariant.Check(end != nil, "must not empty page list on reset") // release pages spaceFreed := 0 for page := b.head; page != end; { freed := int(page.Meta.EndOff) - szEventPageHeader tracef("writer: release page %v (%v)\n", page.Meta.ID, freed) next := page.Next spaceFreed += freed b.releasePage(page) page = next } b.head = end // update memory usage counters b.avail += spaceFreed }