pq/access.go (128 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pq
import "github.com/elastic/go-txfile"
// Access provides transaction support and access to pages and queue header.
// It wraps the Delegate for providing a common interface for working with
// transactions and files.
type access struct {
Delegate
rootID txfile.PageID
rootOff int
quID queueID
}
func makeAccess(delegate Delegate) (access, ErrKind) {
rootID, rootOff := delegate.Root()
if rootID == 0 {
return access{}, NoQueueRoot
}
return access{
Delegate: delegate,
rootID: rootID,
rootOff: int(rootOff),
}, NoError
}
// ReadRoot reads the root page into an array.
// ReadRoot create a short lived read transaction for accessing and copying the
// queue root.
func (a *access) ReadRoot() ([SzRoot]byte, reason) {
const op = "pq/read-queue-root"
var buf [SzRoot]byte
tx, err := a.BeginRead()
if err != nil {
return buf, a.errWrap(op, err)
}
defer tx.Close()
fail := NoError
err = withPage(tx, a.rootID, func(page []byte) {
n := copy(buf[:], page[a.rootOff:])
if n < SzRoot {
fail = InvalidQueueRoot
}
})
if err != nil {
return buf, a.errWrap(op, err)
}
if fail != NoError {
return buf, a.err(op).of(fail)
}
return buf, nil
}
// rootPage accesses the queue root page from within the passed transaction.
func (a *access) rootPage(tx *txfile.Tx) (*txfile.Page, error) {
return tx.Page(a.rootID)
}
func (a *access) RootFileOffset() uintptr {
return a.Offset(a.rootID, uintptr(a.rootOff))
}
// LoadRootPage accesses the queue root page from within the passed write
// transaction.
// The Root page it's content is loaded into the write buffer for manipulations.
// The page returned is not marked as dirty yet.
func (a *access) LoadRootPage(tx *txfile.Tx) (*txfile.Page, *queuePage, reason) {
const op = "pq/load-queue-root"
var hdr *queuePage
page, err := a.rootPage(tx)
if err == nil {
err = page.Load()
if err == nil {
buf, _ := page.Bytes()
hdr = castQueueRootPage(buf[a.rootOff:])
}
}
if err != nil {
msg := "Error reading the queue header"
return nil, nil, a.errWrap(op, err).of(ReadFail).report(msg)
}
return page, hdr, nil
}
// RootHdr returns a pointer to the queue root header. The pointer to the
// header is only valid as long as the transaction is still active.
func (a *access) RootHdr(tx *txfile.Tx) (*queuePage, reason) {
const op = "pq/read-queue-header"
var hdr *queuePage
err := withPage(tx, a.rootID, func(buf []byte) {
hdr = castQueueRootPage(buf[a.rootOff:])
})
if err != nil {
msg := "Error reading the queue header"
return nil, a.errWrap(op, err).of(ReadFail).report(msg)
}
return hdr, nil
}
// ParsePosition parses an on disk position, providing page id, page offset and
// event id in a more accessible format.
func (a *access) ParsePosition(p *pos) position {
page, off := a.SplitOffset(uintptr(p.offset.Get()))
if page != 0 && off == 0 {
off = uintptr(a.PageSize())
}
return position{
page: page,
off: int(off),
id: p.id.Get(),
}
}
// WritePosition serializes a position into it's on-disk representation.
func (a *access) WritePosition(to *pos, pos position) {
pageOff := pos.off
if pageOff == a.PageSize() {
pageOff = 0 // use 0 to mark page offset as end-of-page
}
off := a.Offset(pos.page, uintptr(pageOff))
to.offset.Set(uint64(off))
to.id.Set(pos.id)
}
func (a *access) readPageByID(pool *pagePool, id txfile.PageID) (*page, reason) {
const op = "pq/read-single-page"
tx, err := a.BeginRead()
if err != nil {
return nil, a.errWrap(op, err)
}
defer tx.Close()
var page *page
err = withPage(tx, id, func(buf []byte) {
page = pool.NewPageWith(id, buf)
})
if err != nil {
return nil, a.errWrapPage(op, id, err).of(ReadFail)
}
return page, nil
}
func (a *access) err(op string) *Error { return a.errPage(op, 0) }
func (a *access) errPage(op string, id txfile.PageID) *Error {
return &Error{op: op, ctx: a.errPageCtx(id)}
}
func (a *access) errWrap(op string, cause error) *Error { return a.errWrapPage(op, 0, cause) }
func (a *access) errWrapPage(op string, id txfile.PageID, cause error) *Error {
return a.errPage(op, id).causedBy(cause)
}
func (a *access) errCtx() errorCtx { return errorCtx{id: a.quID} }
func (a *access) errPageCtx(id txfile.PageID) errorCtx {
if id != 0 {
return errorCtx{id: a.quID, isPage: true, page: id}
}
return errorCtx{id: a.quID}
}