wal.go (202 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package txfile import ( "unsafe" ) // waLog (write-ahead-log) mapping page ids to overwrite page ids in // the write-ahead-log. type waLog struct { mapping walMapping metaPages regionList } type txWalState struct { free pageSet // ids being freed new walMapping // all wal pages used for overwrites in a transaction walLimit uint // transaction wal page count -> execute checkpoint when reached } // walCommitState keeps track of changes applied to the wal log during the // commit. These changes must be recorded for now, as the new wal state must // not be updated in memory until after the transaction has been commit to disk. type walCommitState struct { tx *txWalState updated bool checkpoint bool mapping walMapping // new wal mapping allocRegions regionList // pre-allocate meta pages for serializing new mapping } type walMapping map[PageID]PageID const ( walHeaderSize = uint(unsafe.Sizeof(walPage{})) walEntrySize = 14 defaultWALLimit = 1000 ) func makeWALog() waLog { return waLog{ mapping: walMapping{}, metaPages: nil, } } func (l *waLog) makeTxWALState(limit uint) txWalState { if limit == 0 { // TODO: init wal limit on init, based on max file size limit = defaultWALLimit } return txWalState{ walLimit: limit, } } func (l *waLog) Get(id PageID) PageID { return l.mapping[id] } func (l *waLog) fileCommitPrepare(st *walCommitState, tx *txWalState) { st.tx = tx newWal := createMappingUpdate(l.mapping, tx) st.checkpoint = tx.walLimit > 0 && uint(len(newWal)) >= tx.walLimit st.updated = st.checkpoint || tx.Updated() if st.checkpoint { newWal = tx.new } st.mapping = newWal } func (l *waLog) fileCommitAlloc(tx *Tx, st *walCommitState) reason { const op = "txfile/commit-alloc-wal" if !st.updated { return nil } pages := predictWALMappingPages(st.mapping, uint(tx.PageSize())) if pages > 0 { st.allocRegions = tx.metaAllocator().AllocRegions(&tx.alloc, pages) if st.allocRegions == nil { return errOp(op).of(OutOfMemory). report("not enough space to allocate write ahead meta pages") } } return nil } func (l *waLog) fileCommitSerialize( st *walCommitState, pageSize uint, onPage func(id PageID, buf []byte) reason, ) reason { if !st.updated { return nil } return writeWAL(st.allocRegions, pageSize, st.mapping, onPage) } func (l *waLog) fileCommitMeta(meta *metaPage, st *walCommitState) { if st.updated { var rootPage PageID if len(st.allocRegions) > 0 { rootPage = st.allocRegions[0].id } meta.wal.Set(rootPage) } } func (l *waLog) Commit(st *walCommitState) { if st.updated { l.mapping = st.mapping l.metaPages = st.allocRegions } } func (l walMapping) empty() bool { return len(l) == 0 } func (s *txWalState) Release(id PageID) { s.free.Add(id) if s.new != nil { delete(s.new, id) } } func (s *txWalState) Updated() bool { return !s.free.Empty() || !s.new.empty() } func (s *txWalState) Set(orig, overwrite PageID) { if s.new == nil { s.new = walMapping{} } s.new[orig] = overwrite } func createMappingUpdate(old walMapping, tx *txWalState) walMapping { if !tx.Updated() { return nil } new := walMapping{} for id, walID := range old { if tx.free.Has(id) { continue } if _, exists := tx.new[id]; exists { continue } new[id] = walID } for id, walID := range tx.new { new[id] = walID } return new } func predictWALMappingPages(m walMapping, pageSize uint) uint { perPage := walEntriesPerPage(pageSize) return (uint(len(m)) + perPage - 1) / perPage } func walEntriesPerPage(pageSize uint) uint { payload := pageSize - walHeaderSize return payload / walEntrySize } func readWALMapping( wal *waLog, access func(PageID) []byte, root PageID, ) reason { mapping, ids, err := readWAL(access, root) if err != nil { return err } wal.mapping = mapping wal.metaPages = ids.Regions() return nil } func readWAL( access func(PageID) []byte, root PageID, ) (walMapping, idList, reason) { const op = "txfile/read-wal" if root == 0 { return walMapping{}, nil, nil } mapping := walMapping{} var metaPages idList for pageID := root; pageID != 0; { metaPages.Add(pageID) node, data := castWalPage(access(pageID)) if node == nil { return nil, nil, errOp(op).of(InvalidMetaPage). causedBy(raiseOutOfBounds(pageID)). report("write ahead metadata corrupted") } count := int(node.count.Get()) pageID = node.next.Get() for i := 0; i < count; i++ { // read node mapping. Only 7 bytes are used per pageID var k, v pgID copy(k[0:7], data[0:7]) copy(v[0:7], data[7:14]) data = data[14:] mapping[k.Get()] = v.Get() } } return mapping, metaPages, nil } func writeWAL( to regionList, pageSize uint, mapping walMapping, onPage func(id PageID, buf []byte) reason, ) reason { allocPages := to.PageIDs() writer := newPagingWriter(allocPages, pageSize, 0, onPage) for id, walID := range mapping { var k, v pgID k.Set(id) v.Set(walID) var payload [walEntrySize]byte copy(payload[0:7], k[0:7]) copy(payload[7:14], v[0:7]) if err := writer.Write(payload[:]); err != nil { return err } } return writer.Flush() }