write.go (228 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package txfile import ( "io" "sort" "sync" "github.com/elastic/go-txfile/internal/vfs" ) type writer struct { target writable pageSize uint mux sync.Mutex cond *sync.Cond done bool scheduled []writeMsg scheduled0 [64]writeMsg fsync []syncMsg fsync0 [8]syncMsg syncMode SyncMode pending int // number of scheduled writes since last sync published int // number of writes executed since last sync } type writeMsg struct { sync *txWriteSync id PageID buf []byte fsync bool } type syncMsg struct { sync *txWriteSync count int // number of pages to process, before fsyncing flags syncFlag } type txWriteSync struct { err reason wg sync.WaitGroup } type writable interface { io.WriterAt Sync(vfs.SyncFlag) error } // command as is consumer by the writers run loop type command struct { n int // number of buffered write message to be consumed fsync *txWriteSync // set if fsync is to be executed after writing all messages syncFlags syncFlag // additional fsync flags } type syncFlag uint8 const ( // On IO error, the writer will ignore any write/sync attempts, but return // the first error encountered. // Passing the syncResetErr notifies the writer that the current transaction // is about to fail and all subsequent writes will belong to a new // transaction. So to not stall any writes/operations forever, the writer // will attempt write/sync any future requests, by resetting the internal // error state to 'no error'. syncResetErr syncFlag = 1 << iota // syncDataOnly tells the writer that we don't care about metadata updates like // access/modification timestamps (given the file size didn't change). // Some filesystems profit from data only syncs, as meta data or journals // don't need to be flushed, reducing the overall amount of on disk IO ops. syncDataOnly ) func (w *writer) Init(target writable, pageSize uint, syncMode SyncMode) { if syncMode == SyncDefault { syncMode = SyncData } w.target = target w.syncMode = syncMode w.pageSize = pageSize w.cond = sync.NewCond(&w.mux) w.scheduled = w.scheduled0[:0] w.fsync = w.fsync[:0] } func (w *writer) Stop() { w.mux.Lock() w.done = true w.mux.Unlock() w.cond.Signal() } func (w *writer) Schedule(sync *txWriteSync, id PageID, buf []byte) { sync.Retain() traceln("schedule write") w.mux.Lock() defer w.mux.Unlock() w.scheduled = append(w.scheduled, writeMsg{ sync: sync, id: id, buf: buf, }) w.pending++ w.cond.Signal() } func (w *writer) Sync(sync *txWriteSync, flags syncFlag) { sync.Retain() traceln("schedule sync") w.mux.Lock() defer w.mux.Unlock() w.fsync = append(w.fsync, syncMsg{ sync: sync, count: w.pending, flags: flags, }) w.pending = 0 w.cond.Signal() } func (w *writer) Run() (bool, reason) { var ( err reason done bool cmd command buf [1024]writeMsg ) for { cmd, done = w.nextCommand(buf[:]) if done { return done, nil } traceln("writer message: ", cmd.n, cmd.fsync != nil, done) // TODO: use vector IO if possible (linux: pwritev) msgs := buf[:cmd.n] sort.Slice(msgs, func(i, j int) bool { return msgs[i].id < msgs[j].id }) for _, msg := range msgs { const op = "txfile/write-page" if err == nil { // execute actual write on the page it's file offset: off := uint64(msg.id) * uint64(w.pageSize) tracef("write at(id=%v, off=%v, len=%v)\n", msg.id, off, len(msg.buf)) err = writeAt(op, w.target, msg.buf, int64(off)) } msg.sync.err = err msg.sync.Release() } // execute pending fsync: if fsync := cmd.fsync; fsync != nil { if err == nil { err = w.execSync(cmd) } fsync.err = err traceln("done fsync") fsync.Release() resetErr := cmd.syncFlags.Test(syncResetErr) if resetErr { err = nil } } } } func (w *writer) execSync(cmd command) reason { const op = "txfile/write-sync" syncFlag := vfs.SyncAll switch w.syncMode { case SyncNone: return nil case SyncData: if cmd.syncFlags.Test(syncDataOnly) { syncFlag = vfs.SyncDataOnly } } if err := w.target.Sync(syncFlag); err != nil { return errOp(op).causedBy(err) } return nil } func (w *writer) nextCommand(buf []writeMsg) (command, bool) { w.mux.Lock() defer w.mux.Unlock() traceln("async writer: wait next command") defer traceln("async writer: received next command") for { if w.done { return command{}, true } max := len(w.scheduled) if max == 0 && len(w.fsync) == 0 { // no messages w.cond.Wait() continue } if l := len(buf); l < max { max = l } // Check if we need to fsync and adjust `max` number of pages of required. var sync *txWriteSync var syncFlags syncFlag traceln("check fsync: ", len(w.fsync)) if len(w.fsync) > 0 { msg := w.fsync[0] // number of outstanding scheduled writes before fsync outstanding := msg.count - w.published traceln("outstanding:", outstanding) if outstanding <= max { // -> fsync max, sync, syncFlags = outstanding, msg.sync, msg.flags // advance fsync state w.fsync[0] = syncMsg{} // clear entry, so to potentially clean references from w.fsync0 w.fsync = w.fsync[1:] if len(w.fsync) == 0 { w.fsync = w.fsync0[:0] } } } // return buffers to be processed var n int scheduled := w.scheduled[:max] if len(scheduled) > 0 { n = copy(buf, scheduled) w.scheduled = w.scheduled[n:] if len(w.scheduled) == 0 { w.scheduled = w.scheduled0[:0] } } if sync == nil { w.published += n } else { w.published = 0 } return command{n: n, fsync: sync, syncFlags: syncFlags}, false } } func newTxWriteSync() *txWriteSync { return &txWriteSync{} } func (s *txWriteSync) Retain() { s.wg.Add(1) } func (s *txWriteSync) Release() { s.wg.Done() } func (s *txWriteSync) Wait() reason { s.wg.Wait() return s.err } func writeAt(op string, out io.WriterAt, buf []byte, off int64) reason { for len(buf) > 0 { n, err := out.WriteAt(buf, off) if err != nil { return errOp(op).causedBy(err). reportf("writing %v bytes to off=%v failed", len(buf), off) } off += int64(n) buf = buf[n:] } return nil } func (f syncFlag) Test(other syncFlag) bool { return (f & other) == other }