in write.go [222:287]
func (w *writer) nextCommand(buf []writeMsg) (command, bool) {
w.mux.Lock()
defer w.mux.Unlock()
traceln("async writer: wait next command")
defer traceln("async writer: received next command")
for {
if w.done {
return command{}, true
}
max := len(w.scheduled)
if max == 0 && len(w.fsync) == 0 { // no messages
w.cond.Wait()
continue
}
if l := len(buf); l < max {
max = l
}
// Check if we need to fsync and adjust `max` number of pages of required.
var sync *txWriteSync
var syncFlags syncFlag
traceln("check fsync: ", len(w.fsync))
if len(w.fsync) > 0 {
msg := w.fsync[0]
// number of outstanding scheduled writes before fsync
outstanding := msg.count - w.published
traceln("outstanding:", outstanding)
if outstanding <= max { // -> fsync
max, sync, syncFlags = outstanding, msg.sync, msg.flags
// advance fsync state
w.fsync[0] = syncMsg{} // clear entry, so to potentially clean references from w.fsync0
w.fsync = w.fsync[1:]
if len(w.fsync) == 0 {
w.fsync = w.fsync0[:0]
}
}
}
// return buffers to be processed
var n int
scheduled := w.scheduled[:max]
if len(scheduled) > 0 {
n = copy(buf, scheduled)
w.scheduled = w.scheduled[n:]
if len(w.scheduled) == 0 {
w.scheduled = w.scheduled0[:0]
}
}
if sync == nil {
w.published += n
} else {
w.published = 0
}
return command{n: n, fsync: sync, syncFlags: syncFlags}, false
}
}