func()

in write.go [222:287]


func (w *writer) nextCommand(buf []writeMsg) (command, bool) {
	w.mux.Lock()
	defer w.mux.Unlock()

	traceln("async writer: wait next command")
	defer traceln("async writer: received next command")

	for {
		if w.done {
			return command{}, true
		}

		max := len(w.scheduled)
		if max == 0 && len(w.fsync) == 0 { // no messages
			w.cond.Wait()
			continue
		}

		if l := len(buf); l < max {
			max = l
		}

		// Check if we need to fsync and adjust `max` number of pages of required.
		var sync *txWriteSync
		var syncFlags syncFlag
		traceln("check fsync: ", len(w.fsync))

		if len(w.fsync) > 0 {
			msg := w.fsync[0]

			// number of outstanding scheduled writes before fsync
			outstanding := msg.count - w.published
			traceln("outstanding:", outstanding)

			if outstanding <= max { // -> fsync
				max, sync, syncFlags = outstanding, msg.sync, msg.flags

				// advance fsync state
				w.fsync[0] = syncMsg{} // clear entry, so to potentially clean references from w.fsync0
				w.fsync = w.fsync[1:]
				if len(w.fsync) == 0 {
					w.fsync = w.fsync0[:0]
				}
			}
		}

		// return buffers to be processed
		var n int
		scheduled := w.scheduled[:max]
		if len(scheduled) > 0 {
			n = copy(buf, scheduled)
			w.scheduled = w.scheduled[n:]
			if len(w.scheduled) == 0 {
				w.scheduled = w.scheduled0[:0]
			}
		}

		if sync == nil {
			w.published += n
		} else {
			w.published = 0
		}

		return command{n: n, fsync: sync, syncFlags: syncFlags}, false
	}
}