tx.go (600 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package txfile import ( "sync" "time" "github.com/elastic/go-txfile/internal/cleanup" "github.com/elastic/go-txfile/internal/invariant" ) // Tx provides access to pages in a File. // A transaction MUST always be closed, so to guarantee locks being released as // well. type Tx struct { flags txFlags file *File txid uint // internal correlation id lock sync.Locker writeSync *txWriteSync rootID PageID dataEndID PageID // pages accessed by the transaction pages map[PageID]*Page // allocation/free state alloc txAllocState // scheduled WAL updates wal txWalState // transaction stats tsStart time.Time accessStats txAccessStats } type txAccessStats struct { New uint Read uint Update uint } // TxOptions adds some per transaction options user can set. type TxOptions struct { // Readonly transaction. Readonly bool // Allow write transaction to allocate meta pages from overflow area. // Potentially increasing the file size past the configured max size. // This setting should only be used to guarantee progress when having a // transaction only freeing pages. // Later transactions will try to release pages from the overflow area and // truncate the file, such that we have a chance to operate within max-size // limits again. EnableOverflowArea bool // MetaAreaGrowPercentage sets the percentage of meta pages in use, until // the meta-area grows again. The value must be between 0 and 100. // The default value is 80%. MetaAreaGrowPercentage int // Number of pages in wal overwrite log to automatically trigger // CheckpointWAL on commit. WALLimit uint } type txFlags struct { readonly bool active bool checkpoint bool // mark wal checkpoint has been applied } func newTx(file *File, id uint64, lock sync.Locker, settings TxOptions) *Tx { meta := file.getMetaPage() invariant.Check(meta != nil, "file meta is not set") rootID := meta.root.Get() dataEndMarker := meta.dataEndMarker.Get() tx := &Tx{ flags: txFlags{ readonly: settings.Readonly, active: true, }, file: file, lock: lock, rootID: rootID, dataEndID: dataEndMarker, pages: map[PageID]*Page{}, } if !settings.Readonly { tx.writeSync = newTxWriteSync() tx.alloc = file.allocator.makeTxAllocState( settings.EnableOverflowArea, settings.MetaAreaGrowPercentage, ) tx.wal = file.wal.makeTxWALState(settings.WALLimit) } return tx } func (tx *Tx) onBegin() { o := tx.file.observer if o == nil { return } tx.tsStart = time.Now() o.OnTxBegin(tx.flags.readonly) } // onClose is called when a readonly transaction is closed. func (tx *Tx) onClose() { o := tx.file.observer if o == nil { return } accessed := tx.accessStats.Read o.OnTxClose(tx.file.stats, TxStats{ Readonly: true, Duration: time.Since(tx.tsStart), Total: accessed, Accessed: accessed, }) } // onRollback is called when a writable transaction is closed or rolled back without commit. func (tx *Tx) onRollback() { o := tx.file.observer if o == nil { return } read := tx.accessStats.Read updated := tx.accessStats.Update new := tx.accessStats.New o.OnTxClose(tx.file.stats, TxStats{ Readonly: false, Commit: false, Duration: time.Since(tx.tsStart), Total: read + updated + new, Accessed: read, Updated: updated, Written: updated + new, Allocated: tx.alloc.stats.data.alloc, Freed: tx.alloc.stats.data.freed, }) } // onCommit is called after a writable transaction did succeed. func (tx *Tx) onCommit() { allocStats := &tx.alloc.stats fileStats := &tx.file.stats fileStats.Size = uint64(tx.file.sizeEstimate) fileStats.MetaArea = tx.file.allocator.metaTotal fileStats.MetaAllocated = tx.file.allocator.metaTotal - tx.file.allocator.meta.freelist.Avail() fileStats.DataAllocated += allocStats.data.alloc - allocStats.data.freed - allocStats.toMeta o := tx.file.observer if o == nil { return } read := tx.accessStats.Read updated := tx.accessStats.Update new := tx.accessStats.New o.OnTxClose(tx.file.stats, TxStats{ Readonly: false, Commit: true, Duration: time.Since(tx.tsStart), Total: read + updated + new, Accessed: read, Allocated: allocStats.data.alloc - allocStats.toMeta, Freed: allocStats.data.freed, Written: updated + new, Updated: updated, }) } // onAccess is called when a the memory page pointer is requested. func (tx *Tx) onAccess() { tx.accessStats.Read++ } func (tx *Tx) onWALTransfer(n int) { // number of wal pages copied into data area } // Writable returns true if the transaction supports file modifications. func (tx *Tx) Writable() bool { return !tx.flags.readonly } // Readonly returns true if no modifications to the page are allowed. Trying to // write to a readonly page might result in a non-recoverable panic. func (tx *Tx) Readonly() bool { return tx.flags.readonly } // Active returns true if the transaction can still be used to access pages. // A transaction becomes inactive after Close, Commit or Rollback. // Errors within a transaction might inactivate the transaction as well. // When encountering errors, one should check if the transaction still can be used. func (tx *Tx) Active() bool { return tx.flags.active } // PageSize returns the file page size. func (tx *Tx) PageSize() int { return int(tx.file.allocator.pageSize) } // Root returns the data root page id. This ID must be set via SetRoot // to indicate the start of application data to later transactions. // On new files, the default root is 0, as no application data are stored yet. func (tx *Tx) Root() PageID { return tx.rootID } // SetRoot sets the new root page id, indicating the new start of application // data. SetRoot should be set by the first write transaction, when the file is // generated first. func (tx *Tx) SetRoot(id PageID) { tx.rootID = id } // RootPage returns the application data root page, if the root id has been set // in the past. Returns nil, if no root page is set. func (tx *Tx) RootPage() (*Page, error) { if tx.rootID < 2 { return nil, nil } return tx.getPage("txfile/tx-access-root", tx.rootID) } // Rollback rolls back and closes the current transaction. Rollback returns an // error if the transaction has already been closed by Close, Rollback or // Commit. func (tx *Tx) Rollback() error { const op = "txfile/tx-rollback" tracef("rollback transaction: %p\n", tx) err := tx.finishWith(func() reason { tx.rollbackChanges() return nil }) if err != nil { return tx.errWrap(op, err).of(TxRollbackFail) } return nil } // Commit commits the current transaction to file. The commit step needs to // take the Exclusive Lock, waiting for readonly transactions to be Closed. // Returns an error if the transaction has already been closed by Close, // Rollback or Commit. func (tx *Tx) Commit() error { const op = "txfile/tx-commit" tracef("commit transaction: %p\n", tx) err := tx.finishWith(tx.commitChanges) if err != nil { return tx.errWrap(op, err).of(TxCommitFail) } return nil } // Close closes the transaction, releasing any locks held by the transaction. // It is safe to call Close multiple times. Close on an inactive transaction // will be ignored. // A non-committed read-write transaction will be rolled back on close. // To guaranteed the File and Locking state being valid, even on panic or early return on error, // one should also defer the Close operation on new transactions. // For example: // // tx := f.Begin() // defer tx.Close() // // err := some operation // if err != nil { // return err // } // // return tx.Commit() // func (tx *Tx) Close() error { const op = "txfile/tx-close" tracef("close transaction: %p\n", tx) if !tx.flags.active { return nil } err := tx.finishWith(func() reason { tx.rollbackChanges() return nil }) if err != nil { return tx.errWrap(op, err).of(TxRollbackFail) } return nil } // CheckpointWAL copies all overwrite pages contents into the original pages. // Only already committed pages from older transactions will be overwritten. // Checkpointing only copies the contents and marks the overwrite pages as // freed. The final transaction Commit is required, to propage the WAL mapping changes // to all other transactions. // Dirty pages are not overwritten. Manual checkpointing should be executed at // the end of a transaction, right before committing, so to reduce writes if // contents is to be overwritten anyways. func (tx *Tx) CheckpointWAL() error { if err := tx.canWrite("txfile/tx-checkpoint"); err != nil { return err } tx.doCheckpointWAL() return nil } func (tx *Tx) doCheckpointWAL() { if tx.flags.checkpoint { return } // collect page ids that would have an old WAL page // entry still alive after this transaction. ids := make([]PageID, 0, len(tx.file.wal.mapping)) walIDS := make([]PageID, 0, len(tx.file.wal.mapping)) for id, walID := range tx.file.wal.mapping { page := tx.pages[id] if page != nil { if page.flags.dirty { // wal pages of dirty pages will be freed on flush -> do not copy continue } } ids = append(ids, id) walIDS = append(walIDS, walID) } if len(ids) == 0 { return } // XXX: Some OS/filesystems might lock up when writing to file // from mmapped area. // -> Copy contents into temporary buffer, such that // write operations are not backed by mmapped pages from same file. pageSize := int(tx.PageSize()) writeBuffer := make([]byte, pageSize*len(ids)) for i := range ids { id, walID := ids[i], walIDS[i] contents := tx.access(walID) if contents == nil { panic("invalid WAL mapping") } tracef("checkpoint copy from WAL page %v -> %v\n", walID, id) n := copy(writeBuffer, contents) buf := writeBuffer[:n] writeBuffer = writeBuffer[n:] tx.file.writer.Schedule(tx.writeSync, id, buf) tx.freeWALID(id, walID) } tx.onWALTransfer(len(ids)) tx.flags.checkpoint = true } func (tx *Tx) finishWith(fn func() reason) reason { if !tx.flags.active { return errOf(TxFinished).report("transaction is already closed") } defer tx.close() if tx.flags.readonly { tx.onClose() return nil } return fn() } func (tx *Tx) close() { tx.flags.active = false tx.pages = nil tx.alloc = txAllocState{} tx.wal = txWalState{} tx.writeSync = nil tx.file = nil tx.lock.Unlock() } func (tx *Tx) commitChanges() reason { commitOK := false defer cleanup.IfNot(&commitOK, tx.rollbackChanges) err := tx.tryCommitChanges() commitOK = err == nil if !commitOK { return err } traceMetaPage(tx.file.getMetaPage()) tx.onCommit() return nil } // tryCommitChanges attempts to write flush all pages written and update the // files state by writing the new meta data and finally the meta page. // So to keep the most recent transaction successfully committed usable/consistent, // tryCommitChanges is not allowed to re-use any pages freed within this transaction. // // rough commit sequence: // 1. get pending lock, so no new readers can be started // 2. flush all dirty pages. // - dirty pages overwriting existing contents will, will allocate // a new WAL page to be written to // - If dirty page already has an WAL page, overwrite the original page and // return WAL page to allocator // 3. if WAL was updated (pages added/removed): // - free pages holding the old WAL mapping // - write new WAL mapping // 4. if pages have been freed/allocated: // - free pages holding the old free list entries // - write new free list // 5. fsync, to ensure all updates have been executed before updating the meta page // 6. acquire esclusive lock -> no more readers/writers accessing the file // 6. update the meta page // 7. fsync // 8. update internal structures // 9. release locks func (tx *Tx) tryCommitChanges() reason { const op = "txfile/tx-commit" pending, exclusive := tx.file.locks.Pending(), tx.file.locks.Exclusive() // give concurrent read transactions a chance to complete, but don't allow // for new read transactions to start while executing the commit pending.Lock() defer pending.Unlock() // On function exit wait on writer to finish outstanding operations, in case // we have to return early on error. On success, this is basically a no-op. txWriteComplete := false defer cleanup.IfNot(&txWriteComplete, func() { err := tx.writeSync.Wait() // if wait fails, enforce an fsync with error reset flag. if err != nil { tx.file.writer.Sync(tx.writeSync, syncDataOnly|syncResetErr) tx.writeSync.Wait() } }) // Flush pages. if err := tx.flushPages(op); err != nil { return tx.err(op).report("failed to flush dirty pages") } // 1. finish Tx state updates and free file pages used to hold meta pages csWAL, err := tx.commitPrepareWAL() if err != nil { return err } csAlloc := tx.commitPrepareAlloc() // 2. - 5. Commit changes to file metaID, err := tx.tryCommitChangesToFile(&csWAL, &csAlloc) if err != nil { return err } // 6. wait for all pages beeing written and synced, // before updating in memory state. err = tx.writeSync.Wait() txWriteComplete = true if err != nil { return err } // At this point the transaction has been completed on file level. // Update internal structures as well, so future transactions // will use the new serialized transaction state. // We have only one active write transaction + freelist is not shared with read transactions // -> update freelist state before waiting for the exclusive lock to be available tx.file.allocator.Commit(&csAlloc) // Wait for all read transactions to finish before updating global references // to new contents. exclusive.Lock() defer exclusive.Unlock() // Update the WAL mapping. tx.file.wal.Commit(&csWAL) // Switch the files active meta page to meta page being written. tx.file.metaActive = metaID // Compare required file size with the real file size and the mmaped region. // If the expected file size of the last transaction is < the real file size, // we can truncate the file and update the mmaped region. // If the expected file size is > the mmaped region, we need to update the mmaped file region. // If we fail here, the file and internal state is already updated + valid. // But mmap failed on us -> fatal error endMarker := tx.file.allocator.data.endMarker if metaEnd := tx.file.allocator.meta.endMarker; metaEnd > endMarker { endMarker = metaEnd } // Compute maximum expected file size of current transaction // and update the memory mapping if required. expectedMMapSize := int64(uint(endMarker) * tx.file.allocator.pageSize) maxSize := int64(tx.file.allocator.maxSize) pageSize := tx.file.allocator.pageSize requiredFileSz, truncate := checkTruncate(&tx.alloc, tx.file.size, expectedMMapSize, maxSize, pageSize) if truncate { err = tx.file.truncate(requiredFileSz) } else if int(expectedMMapSize) > len(tx.file.mapped) { err = tx.file.mmapUpdate() } else { sz := expectedMMapSize if sz < tx.file.size { sz = tx.file.size } tx.file.sizeEstimate = sz } if err != nil { return err } traceln("tx stats:") traceln(" available data pages:", tx.file.allocator.DataAllocator().Avail(nil)) traceln(" available meta pages:", tx.file.allocator.meta.freelist.Avail()) traceln(" total meta pages:", tx.file.allocator.metaTotal) traceln(" freelist pages:", len(tx.file.allocator.freelistPages)) traceln(" wal mapping pages:", len(tx.file.wal.metaPages)) traceln(" max pages:", tx.file.allocator.maxPages) traceln(" wal mapped pages:", len(tx.file.wal.mapping)) return nil } func (tx *Tx) tryCommitChangesToFile( csWAL *walCommitState, csAlloc *allocCommitState, ) (metaID int, err reason) { newMetaBuf := tx.prepareMetaBuffer() newMeta := newMetaBuf.cast() newMeta.root.Set(tx.rootID) // update data root // 2. allocate new file pages for new meta data to be written if err := tx.file.wal.fileCommitAlloc(tx, csWAL); err != nil { return metaID, err } csAlloc.updated = csAlloc.updated || len(csWAL.allocRegions) > 0 if err := tx.file.allocator.fileCommitAlloc(csAlloc); err != nil { return metaID, err } // 3. serialize page mappings and new freelist err = tx.file.wal.fileCommitSerialize(csWAL, uint(tx.PageSize()), tx.scheduleCommitMetaWrite) if err != nil { return metaID, err } err = tx.file.allocator.fileCommitSerialize(csAlloc, tx.scheduleCommitMetaWrite) if err != nil { return metaID, err } // 4. sync all new contents and metadata before updating the ondisk meta page. tx.file.writer.Sync(tx.writeSync, syncDataOnly) // 5. finalize on-disk transaction by writing new meta page. tx.file.wal.fileCommitMeta(newMeta, csWAL) tx.file.allocator.fileCommitMeta(newMeta, csAlloc) metaID = tx.syncNewMeta(&newMetaBuf) // 6. wait for all pages beeing written and synced, // before updating in memory state. return metaID, nil } func checkTruncate( st *txAllocState, sz, mmapSz, maxSz int64, pageSize uint, ) (int64, bool) { if maxSz <= 0 { // file is unbounded, no truncate required return 0, false } expectedFileSz := mmapSz if expectedFileSz < maxSz { expectedFileSz = maxSz } if expectedFileSz >= sz { // Required size still surpasses the last known file size -> do not // truncate. return 0, false } lastEnd := st.data.endMarker if metaEnd := st.meta.endMarker; metaEnd > lastEnd { lastEnd = metaEnd } lastExpectedFileSz := int64(uint(lastEnd) * pageSize) if lastExpectedFileSz < maxSz { lastExpectedFileSz = maxSz } // Compute minimum required file size for the last two active transactions (maximum). if lastExpectedFileSz > expectedFileSz { expectedFileSz = lastExpectedFileSz } return expectedFileSz, expectedFileSz < sz } func (tx *Tx) prepareMetaBuffer() (buf metaBuf) { meta := buf.cast() *meta = *tx.file.getMetaPage() meta.txid.Set(1 + meta.txid.Get()) return } func (tx *Tx) syncNewMeta(buf *metaBuf) int { meta := buf.cast() meta.Finalize() metaID := 1 - tx.file.metaActive tx.file.writer.Schedule(tx.writeSync, PageID(metaID), (*buf)[:]) tx.file.writer.Sync(tx.writeSync, syncDataOnly|syncResetErr) return metaID } func (tx *Tx) commitPrepareWAL() (walCommitState, reason) { var st walCommitState tx.file.wal.fileCommitPrepare(&st, &tx.wal) if st.checkpoint { tx.doCheckpointWAL() } if st.updated { tx.freeMetaRegions(tx.file.wal.metaPages) } return st, nil } func (tx *Tx) commitPrepareAlloc() (state allocCommitState) { tx.file.allocator.fileCommitPrepare(&state, &tx.alloc, false) if state.updated { tx.freeMetaRegions(tx.file.allocator.freelistPages) } return state } func (tx *Tx) freeMetaRegions(rl regionList) { tx.metaAllocator().FreeRegions(&tx.alloc, rl) } func (tx *Tx) access(id PageID) []byte { tx.onAccess() return tx.file.mmapedPage(id) } // scheduleCommitMetaWrite is used to schedule a page write for the file meta // data like free list or page mappings. scheduleCommitMetaWrite must only be // used during file updates in the commit phase. func (tx *Tx) scheduleCommitMetaWrite(id PageID, buf []byte) reason { tx.accessStats.New++ return tx.scheduleWrite(id, buf) } func (tx *Tx) scheduleWrite(id PageID, buf []byte) reason { tx.file.writer.Schedule(tx.writeSync, id, buf) return nil } // rollbackChanges undoes all changes scheduled. // Potentially changes to be undone: // 1. WAL: // - mapping is only updated after ACK. // - pages have been allocated from meta area -> only restore freelists // 2. Allocations: // - restore freelists, by returning allocated page // ids < old endmarker to freelists // - restore old end markers. // - move pages allocated into meta area back into data area // 3. File: // - With page flushing or transaction failing late during commit, // file might have been grown. // => // - Truncate file only if pages in overflow area have been allocated. // - If maxSize == 0, truncate file to old end marker. func (tx *Tx) rollbackChanges() { tracef("rollback changes in transaction: %p\n", tx) tx.onRollback() tx.file.allocator.Rollback(&tx.alloc) maxPages := tx.file.allocator.maxPages if maxPages == 0 { return } // compute endmarker from before running the last transaction endMarker := tx.file.allocator.meta.endMarker if dataEnd := tx.file.allocator.data.endMarker; dataEnd > endMarker { endMarker = dataEnd } sz, err := tx.file.file.Size() if err != nil { // getting file size failed. State is valid, but we can not truncate // ¯\_(ツ)_/¯ return } truncateSz := uint(endMarker) * tx.file.allocator.pageSize if uint(sz) > uint(truncateSz) { // ignore truncate error, as truncating a memory mapped file might not be // supported by all OSes/filesystems. err := tx.file.file.Truncate(int64(truncateSz)) if err != nil { traceln("rollback file truncate failed with:", err) } } } // Page accesses a page by ID. Accessed pages are cached. Retrieving a page // that has already been accessed, will return a pointer to the same Page object. // Returns an error if the id is known to be invalid or the page has already // been freed. func (tx *Tx) Page(id PageID) (*Page, error) { const op = "txfile/tx-access-page" return tx.getPage(op, id) } func (tx *Tx) getPage(op string, id PageID) (*Page, error) { inBounds := id >= 2 if tx.flags.readonly { inBounds = inBounds && id < tx.dataEndID } else { inBounds = inBounds && id < tx.file.allocator.data.endMarker } if !inBounds { return nil, tx.errWrap(op, raiseOutOfBounds(id)) } if tx.alloc.data.freed.Has(id) || tx.alloc.meta.freed.Has(id) { return nil, tx.err(op).of(InvalidOp). report("trying to access an already freed page") } if p := tx.pages[id]; p != nil { return p, nil } page := newPage(tx, id) if walID := tx.file.wal.Get(id); walID != 0 { page.ondiskID = walID } tx.pages[id] = page return page, nil } // Alloc allocates a new writable page with yet empty contents. // Use Load(), Bytes and MarkDirty(), or SetBytes() to fill the page with // new contents. // Returns an error if the transaction is readonly or no more space is available. func (tx *Tx) Alloc() (page *Page, err error) { const op = "txfile/tx-alloc-page" if err := tx.canWrite(op); err != nil { return nil, err } err = tx.allocPagesWith(op, 1, func(p *Page) { page = p }) return page, err } // AllocN allocates n potentially non-contious, yet empty pages. // Returns an error if the transaction is readonly or no more space is available. func (tx *Tx) AllocN(n int) (pages []*Page, err error) { const op = "txfile/tx-alloc-pages" if err := tx.canWrite(op); err != nil { return nil, err } if n <= 0 { return nil, nil } pages, i := make([]*Page, n), 0 err = tx.allocPagesWith(op, n, func(page *Page) { pages[i], i = page, i+1 }) if err != nil { return nil, err } return pages, nil } func (tx *Tx) dataAllocator() *dataAllocator { return tx.file.allocator.DataAllocator() } func (tx *Tx) metaAllocator() *metaAllocator { return tx.file.allocator.MetaAllocator() } func (tx *Tx) walAllocator() *walAllocator { return tx.file.allocator.WALPageAllocator() } func (tx *Tx) allocPagesWith(op string, n int, fn func(*Page)) reason { count := tx.dataAllocator().AllocRegionsWith(&tx.alloc, uint(n), func(reg region) { reg.EachPage(func(id PageID) { page := newPage(tx, id) page.flags.new = true tx.pages[id] = page fn(page) }) }) if count == 0 { return tx.err(op).of(OutOfMemory).reportf("not enough memory to allocate %v data page(s)", n) } return nil } func (tx *Tx) freePage(id PageID) { tx.dataAllocator().Free(&tx.alloc, id) } func (tx *Tx) allocWALID(orig PageID) PageID { id := tx.walAllocator().Alloc(&tx.alloc) if id != 0 { tx.wal.Set(orig, id) } return id } func (tx *Tx) freeWALID(id, walID PageID) { tx.walAllocator().Free(&tx.alloc, walID) tx.wal.Release(id) } // Flush flushes all dirty pages within the transaction. func (tx *Tx) Flush() error { return tx.flushPages("txfile/tx-flush") } func (tx *Tx) flushPages(op string) reason { if err := tx.canWrite(op); err != nil { return err } for _, page := range tx.pages { if err := page.doFlush("txfile/page-flush"); err != nil { return err } } return nil } func (tx *Tx) canRead(op string) *Error { if !tx.flags.active { return tx.err(op).of(TxFinished).report("no read operation on finished transactions allowed") } return nil } func (tx *Tx) canWrite(op string) *Error { var kind ErrKind var msg string if !tx.flags.active { kind, msg = TxFinished, "no write operation on finished transactions allowed" } if tx.flags.readonly { kind, msg = TxReadOnly, "no write operation on read only transaction allowed" } if kind != NoError { return tx.err(op).of(kind).report(msg) } return nil } func (tx *Tx) err(op string) *Error { return &Error{op: op, ctx: tx.errCtx()} } func (tx *Tx) errWrap(op string, cause error) *Error { return tx.err(op).causedBy(cause) } func (tx *Tx) errCtx() errorCtx { ctx := tx.file.errCtx() ctx.txid, ctx.isTx = tx.txid, true return ctx }