pq/page.go (84 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pq
import (
"sync"
"github.com/elastic/go-txfile"
)
// page is used by the write buffer to keep page content and on-disk
// assignment. Pages with Meta.ID == 0 are not allocated on disk yet.
type page struct {
Next *page
Meta pageMeta
Data []byte
}
type pageMeta struct {
ID txfile.PageID
FirstID, LastID uint64
FirstOff uint32
EndOff uint32
Flags pageFlags
}
type pageFlags struct {
Dirty bool // indicates new event contents being written to this page
}
type pagePool struct {
sync.Pool
}
func newPagePool(pageSize int) *pagePool {
return &pagePool{sync.Pool{
New: func() interface{} {
return &page{
Data: make([]byte, pageSize),
}
},
}}
}
func (pp *pagePool) NewPage() *page {
return pp.get()
}
func (pp *pagePool) NewPageWith(id txfile.PageID, contents []byte) *page {
p := pp.NewPage()
copy(p.Data, contents)
hdr := castEventPageHeader(contents)
p.Meta = pageMeta{
ID: id,
FirstID: hdr.first.Get(),
LastID: hdr.last.Get(),
FirstOff: hdr.off.Get(),
}
return p
}
func (pp *pagePool) get() *page { return pp.Pool.Get().(*page) }
func (pp *pagePool) Release(p *page) {
p.Clear()
pp.Pool.Put(p)
}
// Clear zeroes out a page object and the buffer page header, preparing the
// page object for being reused.
func (p *page) Clear() {
p.Meta = pageMeta{}
p.Next = nil
// clear page header
for i := 0; i < szEventPageHeader; i++ {
p.Data[i] = 0
}
}
// Assigned checks if the page is represented by on on-disk page.
func (p *page) Assigned() bool {
return p.Meta.ID != 0
}
// Dirty checks if the page is dirty and must be flushed.
func (p *page) Dirty() bool {
return p.Meta.Flags.Dirty
}
// MarkDirty marks a page as dirty.
func (p *page) MarkDirty() {
p.Meta.Flags.Dirty = true
}
// UnmarkDirty marks a page as being in sync with the on-disk page.
func (p *page) UnmarkDirty() {
p.Meta.Flags.Dirty = false
}
// SetNext write the next page ID into the page header.
func (p *page) SetNext(id txfile.PageID) {
hdr := castEventPageHeader(p.Data)
hdr.next.Set(uint64(id))
}
// Payload returns the slice of the page it's complete payload.
func (p *page) Payload() []byte {
return p.Data[szEventPageHeader:]
}
// UpdateHeader updates the page header to reflect the page meta-data pages.
func (p *page) UpdateHeader() {
hdr := castEventPageHeader(p.Data)
hdr.first.Set(p.Meta.FirstID)
hdr.last.Set(p.Meta.LastID)
hdr.off.Set(p.Meta.FirstOff)
}