page.go (195 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package txfile
// Page provides access to an on disk page.
// Pages can only be overwritten from within a read-write Transaction.
// Writes are be buffered until transaction commit, such that other but the
// current transaction will not be able to see file changes.
type Page struct {
id PageID // Original PageID for user access.
ondiskID PageID // On disk PageID. If contents is loaded from overwrite page, ondiskID != id
tx *Tx // Parent transaction.
bytes []byte // Page contents.
flags pageFlags
}
// PageID used to reference a file pages.
type PageID uint64
type pageFlags struct {
new bool // page has been allocated. No on-disk contents.
freed bool // page has been freed within current transaction.
flushed bool // page has already been flushed. No more writing possible.
cached bool // original page contents is copied in memory and can be overwritten.
dirty bool // page is marked as dirty and will be written on commit
}
const minPageSize = 1024
// newPage creates a new page context within the current transaction.
func newPage(tx *Tx, id PageID) *Page {
return &Page{id: id, ondiskID: id, tx: tx}
}
func (p *Page) onWriteNew() {
p.tx.accessStats.New++
}
func (p *Page) onUpdated() {
p.tx.accessStats.Update++
}
// ID returns the pages PageID. The ID can be used to store a reference
// to this page, for use within another transaction.
func (p *Page) ID() PageID { return p.id }
// Readonly checks if the page is accessed in readonly mode.
func (p *Page) Readonly() bool { return p.tx.Readonly() }
// Writable checks if the page can be written to.
func (p *Page) Writable() bool { return !p.Readonly() }
// Dirty reports if the page is marked as dirty and needs to be flushed on
// commit.
func (p *Page) Dirty() bool { return p.flags.dirty }
// MarkDirty marks a page as dirty. MarkDirty should only be used if
// in-place modification to the pages buffer have been made, after use of Load().
func (p *Page) MarkDirty() error {
const op = "txfile/page-mark-dirty"
if err := p.canWrite(op); err != nil {
return err
}
p.setDirty()
return nil
}
func (p *Page) setDirty() {
if p.flags.dirty {
return
}
p.flags.dirty = true
if p.flags.new {
p.onWriteNew()
} else {
p.onUpdated()
}
}
// Free marks a page as free. Freeing a dirty page will return an error.
// The page will be returned to the allocator when the transaction commits.
func (p *Page) Free() error {
const op = "txfile/page-free"
if err := p.canWrite(op); err != nil {
return err
}
if p.flags.dirty {
const msg = "freeing dirty pages is not allowed"
return &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg}
}
p.tx.freePage(p.id)
if p.id != p.ondiskID {
p.tx.freeWALID(p.id, p.ondiskID)
}
p.flags.freed = true
return nil
}
// Bytes returns the page its contents.
// One can only modify the buffer in write transaction, if Load() or SetBytes()
// have been called before Bytes(). Otherwise a non-recoverable BUS panic might
// be triggerd (program will be killed by OS).
// Bytes returns an error if the page has just been allocated (no backing buffer)
// or the transaction is already been closed.
// Use SetBytes() or Load(), to initialize the buffer of a newly allocated page.
func (p *Page) Bytes() ([]byte, error) {
const op = "txfile/page-bytes"
if err := p.canRead(op); err != nil {
return nil, err
}
if p.bytes == nil && p.flags.new {
const msg = "can not read contents of fresh allocated page without contents"
return nil, &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg}
}
return p.getBytes(op)
}
func (p *Page) getBytes(op string) ([]byte, reason) {
if p.bytes == nil {
bytes := p.tx.access(p.ondiskID)
if bytes == nil {
cause := raiseOutOfBounds(p.ondiskID)
return nil, &Error{op: op, ctx: p.errCtx(), cause: cause}
}
p.bytes = bytes
}
return p.bytes, nil
}
// Load reads the pages original contents into a cached memory buffer, allowing
// for in-place modifications to the page. Load returns and error, if used from
// within a readonly transaction.
// If the page has been allocated from within the current transaction, a new
// temporary buffer will be allocated.
// After load, the write-buffer can be accessed via Bytes(). After modifications to the buffer,
// one must use MarkDirty(), so the page will be flushed on commit.
func (p *Page) Load() error {
const op = "txfile/page-load-writable"
if err := p.canWrite(op); err != nil {
return err
}
return p.loadBytes(op)
}
func (p *Page) loadBytes(op string) reason {
if p.flags.cached {
return nil
}
if p.flags.new {
p.flags.cached = true
p.bytes = make([]byte, p.tx.PageSize())
return nil
}
if p.flags.dirty {
p.flags.cached = true
return nil
}
// copy original contents into writable buffer (page needs to be marked dirty if contents is overwritten)
orig, err := p.getBytes(op)
if err != nil {
return err
}
tmp := make([]byte, len(orig))
copy(tmp, orig)
p.bytes = tmp
p.flags.cached = true
return nil
}
// SetBytes sets the new contents of a page. If the size of contents is less
// then the files page size, the original contents must be read. If the length
// of contents matches the page size, a reference to the contents buffer will
// be held. To enforce a copy, use Load(), Bytes(), copy() and MarkDirty().
func (p *Page) SetBytes(contents []byte) error {
const op = "txfile/page-set-bytes"
if err := p.canWrite(op); err != nil {
return err
}
pageSize := p.tx.PageSize()
if len(contents) > pageSize {
const msg = "page contents must not exceed the page size"
return &Error{op: op, kind: InvalidParam, ctx: p.errCtx(), msg: msg}
}
if len(contents) < pageSize {
if err := p.loadBytes(op); err != nil {
return err
}
copy(p.bytes, contents)
} else {
p.bytes = contents
}
p.setDirty()
return nil
}
// Flush flushes the page write buffer, if the page is marked as dirty.
// The page its contents must not be changed after calling Flush, as the flush
// is executed asynchronously in the background.
// Dirty pages will be automatically flushed on commit.
func (p *Page) Flush() error {
const op = "txfile/page-flush"
if err := p.canWrite(op); err != nil {
return err
}
return p.doFlush(op)
}
func (p *Page) doFlush(op string) reason {
if !p.flags.dirty || p.flags.flushed {
return nil
}
if !p.flags.new {
if p.id == p.ondiskID {
walID := p.tx.allocWALID(p.id)
if walID == 0 {
const msg = "not enough space to allocate write ahead page"
return &Error{op: op, kind: OutOfMemory, ctx: p.errCtx(), msg: msg}
}
p.ondiskID = walID
} else {
// page already in WAL -> free WAL page and write into original page
p.tx.freeWALID(p.id, p.ondiskID)
p.ondiskID = p.id
}
}
p.flags.flushed = true
p.tx.scheduleWrite(p.ondiskID, p.bytes)
return nil
}
func (p *Page) canRead(op string) *Error {
err := p.tx.canRead(op)
if err != nil {
err.ctx = p.errCtx()
}
return err
}
func (p *Page) canWrite(op string) *Error {
if err := p.tx.canWrite(op); err != nil {
err.ctx = p.errCtx()
return err
}
var msg string
switch {
case p.flags.freed:
msg = "page is already freed"
case p.flags.flushed:
msg = "page is already flushed"
}
if msg != "" {
return &Error{op: op, kind: InvalidOp, ctx: p.errCtx(), msg: msg}
}
return nil
}
func (p *Page) errCtx() errorCtx {
ctx := p.tx.errCtx()
ctx.page, ctx.isPage = p.id, true
return ctx
}