file.go (608 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package txfile
import (
"fmt"
"math"
"math/bits"
"os"
"sync"
"sync/atomic"
"unsafe"
"github.com/elastic/go-txfile/internal/cleanup"
"github.com/elastic/go-txfile/internal/invariant"
"github.com/elastic/go-txfile/internal/vfs"
"github.com/elastic/go-txfile/internal/vfs/osfs"
)
// File provides transactional support to pages of a file. A file is split into
// pages of type PageSize. Pages within the file are only accessible by page IDs
// from within active transactions.
type File struct {
// Atomic fields.
// Do not move: Must be 64bit-word aligned on some architectures.
txids uint64
observer Observer
path string
readonly bool
file vfs.File
size int64 // real file size (updated on mmap update only)
sizeEstimate int64 // estimated real file size based on last update and the total vs. used mmaped region
locks lock
wg sync.WaitGroup // local async workers wait group
writer writer
allocator allocator
wal waLog
// mmap info
mapped []byte
// meta pages
meta [2]*metaPage
metaActive int
stats FileStats
}
// internal contants
const (
initBits uint = 16 // 2 ^ 16 Bytes
initSize = 1 << initBits // 64KB
sz1GB = 1 << 30
doubleLimit = sz1GB // upper limit when to stop doubling the mmaped area
minRequiredFileSize = initSize
)
var maxMmapSize uint
func init() {
if math.MaxUint32 == maxUint {
maxMmapSize = 2 * sz1GB
} else {
tmp := uint64(0x1FFFFFFFFFFF)
maxMmapSize = uint(tmp)
}
}
// Open opens or creates a new transactional file.
// Open tries to create the file, if the file does not exist yet. Returns an
// error if file access fails, file can not be locked or file meta pages are
// found to be invalid.
func Open(path string, mode os.FileMode, opts Options) (*File, error) {
const op = "txfile/open"
if err := opts.Validate(); err != nil {
return nil, fileErrWrap(op, path, err)
}
file, err := osfs.Open(path, mode)
if err != nil {
return nil, fileErrWrap(op, path, err).report("can not open file")
}
initOK := false
defer cleanup.IfNot(&initOK, cleanup.IgnoreError(file.Close))
waitLock := (opts.Flags & FlagWaitLock) == FlagWaitLock
if err := file.Lock(true, waitLock); err != nil {
return nil, fileErrWrap(op, path, err).report("failed to lock file")
}
defer cleanup.IfNot(&initOK, cleanup.IgnoreError(file.Unlock))
// initialize the file
f, err := openWith(file, opts)
if err != nil {
return nil, fileErrWrap(op, path, err).report("failed to open file")
}
tracef("open file: %p (%v)\n", f, path)
traceMetaPage(f.getMetaPage())
f.reportOpen()
initOK = true
return f, nil
}
// openWith implements the actual opening sequence, including file
// initialization and validation.
func openWith(file vfs.File, opts Options) (*File, reason) {
sz, ferr := file.Size()
if ferr != nil {
return nil, wrapErr(ferr)
}
isNew := false
fileExists := sz > 0
if !fileExists {
if err := initNewFile(file, opts); err != nil {
return nil, err
}
isNew = true
}
meta, metaActive, err := readValidMeta(file)
if err != nil {
return nil, err
}
pageSize := meta.pageSize.Get()
maxSize := meta.maxSize.Get()
if maxSize == 0 && opts.MaxSize > 0 {
maxSize = opts.MaxSize
}
if maxSize > uint64(maxUint) {
return nil, raiseInvalidParam("max file size to large for this system")
}
f, err := newFile(file, opts, metaActive, uint(maxSize), uint(pageSize))
if err != nil {
return nil, err
}
// Update the files MaxSize after the new file object has been created.
// This allows us to handle the max size update like a transaction.
if (!isNew && opts.Flags.check(FlagUpdMaxSize)) && opts.MaxSize != maxSize {
ok := false
defer cleanup.IfNot(&ok, cleanup.IgnoreError(f.Close))
op := growFile
if opts.MaxSize > 0 && opts.MaxSize < maxSize {
op = shrinkFile
}
err := op(f, opts)
if err != nil {
return nil, err
}
ok = true
}
return f, nil
}
// newFile creates and initializes a new File. File state is initialized
// from file and internal workers will be started.
func newFile(
file vfs.File,
opts Options,
metaActive int,
maxSize, pageSize uint,
) (*File, reason) {
f := &File{
file: file,
path: file.Name(),
allocator: allocator{
maxSize: maxSize,
pageSize: pageSize,
},
observer: opts.Observer,
}
f.locks.init()
if err := f.mmap(); err != nil {
return nil, err
}
initOK := false
defer cleanup.IfNot(&initOK, ignoreReason(f.munmap))
if err := f.init(metaActive, opts); err != nil {
return nil, err
}
invariant.CheckNot(f.allocator.maxSize != 0 && f.allocator.maxPages == 0,
"page limit not configured on allocator")
// create asynchronous writer
f.writer.Init(file, f.allocator.pageSize, opts.Sync)
f.wg.Add(1)
go func() {
defer f.wg.Done()
f.writer.Run()
}()
initOK = true
return f, nil
}
// init initializes the File state from most recent valid meta-page.
func (f *File) init(metaActive int, opts Options) reason {
const op = "txfile/init-read-state"
// reference active meta page for initializing internal structures
f.metaActive = metaActive
meta := f.meta[f.metaActive]
if err := readWALMapping(&f.wal, f.mmapedPage, meta.wal.Get()); err != nil {
return f.errWrap(op, err).of(InitFailed)
}
if err := readAllocatorState(&f.allocator, f, meta, opts); err != nil {
return f.errWrap(op, err).of(InitFailed)
}
return nil
}
func (f *File) reportOpen() {
const numFileHeaders = 2
meta := f.getMetaPage()
fileEnd := uint(meta.dataEndMarker.Get())
if m := uint(meta.metaEndMarker.Get()); m > fileEnd {
fileEnd = m
}
metaArea := uint(meta.metaTotal.Get())
metaInUse := metaArea - f.allocator.meta.freelist.Avail()
dataInUse := fileEnd - numFileHeaders - metaArea - f.allocator.data.freelist.Avail()
f.stats = FileStats{
Version: meta.version.Get(),
Size: uint64(f.size),
MaxSize: meta.maxSize.Get(),
PageSize: meta.pageSize.Get(),
MetaArea: metaArea,
DataAllocated: dataInUse,
MetaAllocated: metaInUse,
}
o := f.observer
if o == nil {
return
}
o.OnOpen(f.stats)
}
// Close closes the file, after all transactions have been quit. After closing
// a file, no more transactions can be started.
func (f *File) Close() error {
// zero out f on exit -> using f after close should generate a panic
defer func() { *f = File{} }()
tracef("start file shutdown: %p\n", f)
defer tracef("file closed: %p\n", f)
// get reserved lock, such that no write transactions can be started
f.locks.Reserved().Lock()
defer f.locks.Reserved().Unlock()
// get pending lock, such that no new read transaction can be started
f.locks.Pending().Lock()
defer f.locks.Pending().Unlock()
// get exclusive lock, waiting for active read transactions to be finished
f.locks.Exclusive().Lock()
defer f.locks.Exclusive().Unlock()
// no other active transactions -> close file
f.munmap()
f.writer.Stop()
errUnlock := f.file.Unlock()
errClose := f.file.Close()
// wait for workers to stop
f.wg.Wait()
if errUnlock != nil {
return errUnlock
}
return errClose
}
// Readonly returns true if the file has been opened in readonly mode.
func (f *File) Readonly() bool {
return f.readonly
}
// Begin creates a new read-write transaction. The transaction returned
// does hold the Reserved Lock on the file. Use Close, Rollback, or Commit to
// release the lock.
func (f *File) Begin() (*Tx, error) {
return f.BeginWith(TxOptions{Readonly: false})
}
// BeginReadonly creates a new readonly transaction. The transaction returned
// does hold the Shared Lock on the file. Use Close() to release the lock.
func (f *File) BeginReadonly() (*Tx, error) {
return f.BeginWith(TxOptions{Readonly: true})
}
// BeginWith creates a new readonly or read-write transaction, with additional
// transaction settings.
func (f *File) BeginWith(settings TxOptions) (*Tx, error) {
return f.beginTx(settings)
}
func (f *File) beginTx(settings TxOptions) (*Tx, reason) {
const op = "txfile/begin-tx"
if f.readonly && !settings.Readonly {
msg := "can not start writable transaction on readonly file"
return nil, f.err(op).of(InvalidOp).report(msg)
}
tracef("request new transaction (readonly: %v)\n", settings.Readonly)
// Acquire transaction log.
// Unlock on panic, so applications will not be blocked in case they try to
// defer some close operations on the file.
ok := false
lock := f.locks.TxLock(settings.Readonly)
lock.Lock()
defer cleanup.IfNot(&ok, lock.Unlock)
txid := atomic.AddUint64(&f.txids, 1)
tracef("init new transaction (readonly: %v)\n", settings.Readonly)
tx := newTx(f, txid, lock, settings)
tracef("begin transaction: %p (readonly: %v)\n", tx, settings.Readonly)
tx.onBegin()
ok = true
return tx, nil
}
// PageSize returns the files page size in bytes
func (f *File) PageSize() int {
return int(f.allocator.pageSize)
}
// Offset computes a file offset from PageID and offset within the current
// page.
func (f *File) Offset(id PageID, offset uintptr) uintptr {
sz := uintptr(f.allocator.pageSize)
if offset >= sz {
panic("offset not within page boundary")
}
return offset + uintptr(id)*uintptr(f.allocator.pageSize)
}
// SplitOffset splits a file offset into a page ID for accessing the page and
// and offset within the page.
func (f *File) SplitOffset(offset uintptr) (PageID, uintptr) {
sz := uintptr(f.allocator.pageSize)
id := PageID(offset / sz)
off := offset - ((offset / sz) * sz)
return id, off
}
// truncate updates the file memory-mapping and truncates the file.
// The function ensure the file is not mmapped, so to deal with OSes and
// filesystem not properly truncating mmapped files.
// Due to the memory mapping being updated while truncating, all file locks
// must be held, ensuring no other transaction can read from the file.
func (f *File) truncate(sz int64) reason {
const op = "txfile/truncate"
const errMsg = "can not update file size"
isMMapped := f.mapped != nil
if isMMapped {
if err := f.munmap(); err != nil {
return f.errWrap(op, err).report(errMsg)
}
}
if err := f.file.Truncate(sz); err != nil {
return f.errWrap(op, err).report(errMsg)
}
if isMMapped {
if err := f.mmap(); err != nil {
return f.errWrap(op, err).report(errMsg)
}
}
return nil
}
// mmapUpdate updates the mmaped states.
// A go-routine updating the mmaped aread, must hold all locks on the file.
func (f *File) mmapUpdate() (err reason) {
if err = f.munmap(); err == nil {
err = f.mmap()
}
return
}
// mmap maps the files contents and updates internal pointers into the mmaped memory area.
func (f *File) mmap() reason {
const op = "txfile/mmap"
// update real file size
fileSize, fileErr := f.file.Size()
if fileErr != nil {
const msg = "unable to determine file size for mmap region"
return f.errWrap(op, fileErr).report(msg)
}
if fileSize < 0 {
msg := fmt.Sprintf("file size %v < 0", fileSize)
return f.err(op).of(InvalidFileSize).report(msg)
}
f.size = fileSize
f.sizeEstimate = fileSize // reset estimate
maxSize := f.allocator.maxSize
if em := uint(f.allocator.meta.endMarker); maxSize > 0 && em > f.allocator.maxPages {
maxSize = em * f.allocator.pageSize
}
pageSize := f.allocator.pageSize
sz, err := computePlatformMmapSize(uint(fileSize), maxSize, uint(pageSize))
if err != nil {
return err
}
// map file
buf, fileErr := f.file.MMap(int(sz))
if err != nil {
return f.errWrap(op, err).report("can not mmap file")
}
f.mapped = buf
f.meta[0] = castMetaPage(buf[0:])
f.meta[1] = castMetaPage(buf[pageSize:])
return nil
}
// munmap unmaps the file and sets internal mapping to nil.
func (f *File) munmap() reason {
const op = "txfile/munmap"
err := f.file.MUnmap(f.mapped)
f.mapped = nil
if err != nil {
return f.errWrap(op, err)
}
return nil
}
// mmapedPage finds the mmaped page contents by the given pageID.
// The byte buffer can only be used for reading.
func (f *File) mmapedPage(id PageID) []byte {
pageSize := uint64(f.allocator.pageSize)
start := uint64(id) * pageSize
end := start + pageSize
if uint64(len(f.mapped)) < end {
return nil
}
return f.mapped[start:end]
}
// initNewFile initializes a new, yet empty Files metapages.
func initNewFile(file vfs.File, opts Options) reason {
const op = "txfile/create"
var flags uint32
if opts.MaxSize > 0 && opts.Prealloc {
flags |= metaFlagPrealloc
if err := file.Truncate(int64(opts.MaxSize)); err != nil {
return fileErrWrap(op, file.Name(), err).of(FileCreationFailed).
report("unable to preallocate file")
}
}
maxSize := opts.MaxSize
if opts.Flags.check(FlagUnboundMaxSize) {
maxSize = 0
}
pageSize := opts.PageSize
if opts.PageSize == 0 {
pageSize = uint32(os.Getpagesize())
if pageSize < minPageSize {
pageSize = minPageSize
}
}
if !isPowerOf2(uint64(pageSize)) {
cause := raiseInvalidParamf("pageSize %v is not power of 2", pageSize)
return fileErrWrap(op, file.Name(), cause).of(FileCreationFailed)
}
if pageSize < minPageSize {
cause := raiseInvalidParamf("pageSize must be >= %v", minPageSize)
return fileErrWrap(op, file.Name(), cause).of(FileCreationFailed)
}
// create buffer to hold contents for the initial pages:
// 1. meta page 0
// 2. meta page 1
// 3. free list page (only of opts.InitMetaArea > 0)
buf := make([]byte, pageSize*3)
// create freelist with meta area only and pre-compute page IDs
requiredPages := 2
metaTotal := opts.InitMetaArea
dataEndMarker := PageID(2)
metaEndMarker := PageID(0) // no meta area
freelistPage := PageID(0)
if metaTotal > 0 {
requiredPages = 3
freelistPage = PageID(2)
// move pages from data area to new meta area by updating markers
dataEndMarker += PageID(metaTotal)
metaEndMarker = dataEndMarker
// write freelist, so to make meta page allocatable
hdr, body := castFreePage(buf[int(pageSize)*2:])
hdr.next.Set(0)
if metaTotal > 1 {
hdr.count.Set(1)
encodeRegion(body, true, region{
id: freelistPage + 1,
count: metaTotal - 1,
})
}
}
// create meta pages
for i := 0; i < 2; i++ {
pg := castMetaPage(buf[int(pageSize)*i:])
pg.Init(flags, pageSize, maxSize)
pg.txid.Set(uint64(1 - i))
pg.dataEndMarker.Set(dataEndMarker) // endMarker is index of next to be allocated page at end of file
pg.metaEndMarker.Set(metaEndMarker)
pg.metaTotal.Set(uint64(metaTotal))
pg.freelist.Set(freelistPage)
pg.Finalize()
}
// write initial pages to disk
err := writeAt(op, file, buf[:int(pageSize)*requiredPages], 0)
if err == nil {
if syncErr := file.Sync(vfs.SyncAll); syncErr != nil {
err = fileErrWrap(op, file.Name(), syncErr)
}
}
if err != nil {
return fileErrWrap(op, file.Name(), err).of(FileCreationFailed).
report("io error while initializing data file")
}
return nil
}
// readValidMeta tries to read a valid meta page from the file.
// The first valid meta page encountered is returned.
func readValidMeta(f vfs.File) (metaPage, int, reason) {
var pages [2]metaPage
var metaErr [2]reason
var metaActive int
var err reason
pages[0], err = readMeta(f, 0)
if err != nil {
return metaPage{}, -1, err
}
pages[1], err = readMeta(f, int64(pages[0].pageSize.Get()))
if err != nil {
return metaPage{}, -1, err
}
metaErr[0] = pages[0].Validate()
metaErr[1] = pages[1].Validate()
switch {
case metaErr[0] != nil && metaErr[1] != nil:
return metaPage{}, -1, metaErr[0]
case metaErr[0] == nil && metaErr[1] != nil:
metaActive = 0
case metaErr[0] != nil && metaErr[1] == nil:
metaActive = 1
default:
// both meta pages valid, choose page with highest transaction number
tx0 := pages[0].txid.Get()
tx1 := pages[1].txid.Get()
if tx0 == tx1 {
panic("meta pages with same transaction id")
}
if int64(tx0-tx1) > 0 { // if tx0 > tx1
metaActive = 0
} else {
metaActive = 1
}
}
return pages[metaActive], metaActive, nil
}
func readMeta(f vfs.File, off int64) (metaPage, reason) {
const op = "txfile/read-file-meta"
var buf [unsafe.Sizeof(metaPage{})]byte
_, err := f.ReadAt(buf[:], off)
if err != nil {
reason := fileErrWrap(op, f.Name(), err)
reason.ctx.SetOffset(off)
return metaPage{}, reason.report("failed to read file header page")
}
return *castMetaPage(buf[:]), nil
}
// computeMmapSize determines the page count in multiple of pages.
// Up to 1GB, the mmaped file area is double (starting at 64KB) on every grows.
// That is, exponential grows with values of 64KB, 128KB, 512KB, 1024KB, and so on.
// Once 1GB is reached, the mmaped area is always a multiple of 1GB.
func computeMmapSize(minSize, maxSize, pageSize uint) (uint, reason) {
if maxSize != 0 {
// return maxSize as multiple of pages. Round downwards in case maxSize
// is not multiple of pages
if minSize > maxSize {
maxSize = minSize
}
sz := ((maxSize + pageSize - 1) / pageSize) * pageSize
if sz < initSize {
return 0, raiseInvalidParamf("max size of %v bytes is too small", maxSize)
}
return sz, nil
}
if minSize < doubleLimit {
// grow by next power of 2, starting at 64KB
initBits := uint(16) // 64KB min
power2Bits := uint(64 - bits.LeadingZeros64(uint64(minSize)))
if power2Bits < initBits {
power2Bits = initBits
}
return 1 << power2Bits, nil
}
// allocate number of 1GB blocks to fulfill minSize
sz := ((minSize + (sz1GB - 1)) / sz1GB) * sz1GB
if sz > maxMmapSize {
return 0, raiseInvalidParamf("mmap size of %v bytes is too large", sz)
}
// ensure we have a multiple of pageSize
sz = ((sz + pageSize - 1) / pageSize) * pageSize
return sz, nil
}
// getMetaPage returns a pointer to the meta-page of the last valid transaction
// found.
func (f *File) getMetaPage() *metaPage {
return f.meta[f.metaActive]
}
// growFile executes a write transaction, growing the files max size setting.
// If opts.Preallocate is set, the file will be truncated to the new file size on success.
func growFile(f *File, opts Options) reason {
const op = "txfile/grow"
err := doGrowFile(f, opts)
if err != nil {
return fileErrWrap(op, f.path, err).report("failed to increase file size")
}
return nil
}
func doGrowFile(f *File, opts Options) reason {
maxPages, maxSize, err := initTxMaxSize(f, opts.MaxSize)
if err != nil {
return err
}
// Transaction completed. Update file allocator limits
f.allocator.maxPages = maxPages
f.allocator.maxSize = maxSize
// Allocate space on disk if prealloc is enabled and new file size is bounded.
if opts.Prealloc && maxSize > 0 {
if err := f.truncate(int64(maxSize)); err != nil {
return err
}
if err := f.mmapUpdate(); err != nil {
return wrapErr(err)
}
}
return nil
}
// shrinkFile reconfigures the new max size to a smaller value and tries to
// remove excessive pages from the free list.
// The removal of excessive pages is done in a second transaction, that is
// allowed to fail. Excessive pages are freed in future transactions anyways.
// The file is not truncated yet, as the last 2 transaction must agree on the
// actual file size before truncating. Truncation is postponed to later transactions.
func shrinkFile(f *File, opts Options) reason {
const op = "txfile/shrink"
// 1. Start transaction updating the file meta header only. This transaction must succeed.
maxPages, maxSize, err := initTxMaxSize(f, opts.MaxSize)
if err != nil {
return fileErrWrap(op, f.path, err).
report("failed to reduce the maximum file size")
}
// 2. Transaction completed. Update file allocator limits
f.allocator.maxPages = maxPages
f.allocator.maxSize = maxSize
canReleaseRegions := func(area *allocArea) bool {
end := area.freelist.LastRegion().End()
return end == area.endMarker && uint(area.endMarker) > maxPages
}
// 3. Start a new transaction, trying to remove pages from the freelist
data := &f.allocator.data
meta := &f.allocator.meta
if canReleaseRegions(data) || canReleaseRegions(meta) {
initTxReleaseRegions(f)
}
return nil
}
// initTxMaxSize runs a write transaction, updating the file maxSize
// to the newMaxSize value.
// initTxMaxSize is used when opening an existing file.
// As the file size must be a multiple of the file's page size, the number of
// maximum pages and the actual max file size is returned on success.
func initTxMaxSize(
f *File,
newMaxSize uint64,
) (maxPages, maxSize uint, err reason) {
const op = "txfile/tx-update-maxsize"
var metaID int
err = withInitTx(f, func(tx *Tx) reason {
// create new meta header for new ongoing write transaction
newMetaBuf := tx.prepareMetaBuffer()
newMeta := newMetaBuf.cast()
// update max size
pageSize := uint(newMeta.pageSize.Get())
maxSize = uint(newMaxSize)
maxPages = maxSize / pageSize
maxSize = maxPages * pageSize // round new max size to multiple of page size
newMeta.maxSize.Set(uint64(maxSize))
// sync new transaction state to disk
metaID = tx.syncNewMeta(&newMetaBuf)
err := tx.writeSync.Wait()
if err != nil {
return f.errWrap(op, err).of(TxFailed).
report("failed to update the on disk max size header entry")
}
return nil
})
if err == nil {
f.metaActive = metaID
}
return
}
// initTxReleaseRegions attempts to remove pages from the freelist, that exceed
// the files max size. The transaction is not required to succeed. If it fails,
// we just rollback. Subsequent write transactions will try to continue
// retuning pages to the file systems.
// The transaction is prone to fail if there is not enough space to serialize
// the new freelist to.
// initTxReleaseRegions should only be called if it's clear pages can be
// removed. Otherwise an 'empty' transaction
func initTxReleaseRegions(f *File) {
withInitTx(f, func(tx *Tx) reason {
// Init new allocator commit state, returning current meta pages into the
// freelist.
var csAlloc allocCommitState
tx.file.allocator.fileCommitPrepare(&csAlloc, &tx.alloc, true)
// Compute new free lists and remove page ids > max file size from the end
// of the freelist. Pages to be freed must border on the files allocation
// end markers.
if err := tx.file.allocator.fileCommitAlloc(&csAlloc); err != nil {
return err
}
// Serialize new freelist to disk.
if err := tx.file.allocator.fileCommitSerialize(&csAlloc, tx.scheduleWrite); err != nil {
return err
}
tx.file.writer.Sync(tx.writeSync, syncDataOnly)
// Update file meta header.
newMetaBuf := tx.prepareMetaBuffer()
newMeta := newMetaBuf.cast()
tx.file.allocator.fileCommitMeta(newMeta, &csAlloc)
metaID := tx.syncNewMeta(&newMetaBuf)
// Finalize on-disk transaction.
if err := tx.writeSync.Wait(); err != nil {
return wrapErr(err)
}
// Commit allocator changes to in-memory allocator.
tx.file.allocator.Commit(&csAlloc)
// Switch the files active meta page to meta page being written.
tx.file.metaActive = metaID
return nil
})
}
func withInitTx(f *File, fn func(tx *Tx) reason) reason {
tx, err := f.beginTx(TxOptions{Readonly: false})
if err != nil {
return err.(reason)
}
defer tx.close()
commitOK := false
defer cleanup.IfNot(&commitOK, tx.rollbackChanges)
// use write transactions commit locks. As file if being generated, the
// locks are not really required, yet. But better execute a correct transaction
// sequence, here.
pending, exclusive := tx.file.locks.Pending(), tx.file.locks.Exclusive()
pending.Lock()
defer pending.Lock()
exclusive.Lock()
defer exclusive.Lock()
// On function exit wait on writer to finish outstanding operations, in case
// we have to return early on error. On success, this is basically a no-op.
defer tx.writeSync.Wait()
err = fn(tx)
commitOK = err == nil
return err
}
func (f *File) err(op string) *Error {
return fileErr(op, f.path)
}
func (f *File) errWrap(op string, cause error) *Error {
return fileErrWrap(op, f.path, cause)
}
func (f *File) errCtx() errorCtx { return fileErrCtx(f.path) }
func fileErr(op, path string) *Error {
return &Error{op: op, ctx: fileErrCtx(path)}
}
func fileErrWrap(op, path string, cause error) *Error {
return fileErr(op, path).causedBy(cause)
}
func fileErrCtx(path string) errorCtx {
return errorCtx{file: path}
}