in pq/writer.go [161:204]
func (w *Writer) Next() error {
const op = "pq/writer-next"
if err := w.canWrite(); err != NoError {
return w.errOf(op, err)
}
// finalize current event in buffer and prepare next event
hdr := castEventHeader(w.state.buf.ActiveEventHdr())
hdr.sz.Set(uint32(w.state.eventBytes))
w.state.buf.CommitEvent(w.state.eventID)
w.state.buf.ReserveHdr(szEventHeader)
sz := uint(w.state.eventBytes)
ts := time.Now()
w.state.activeEventBytes += sz
if w.state.activeEventCount == 0 {
w.state.minEventSize = sz
w.state.maxEventSize = sz
w.state.tsOldest = ts
w.state.tsNewest = ts
} else {
if sz < w.state.minEventSize {
w.state.minEventSize = sz
}
if sz > w.state.maxEventSize {
w.state.maxEventSize = sz
}
w.state.tsNewest = ts
}
w.state.eventBytes = 0
w.state.eventID++
w.state.activeEventCount++
// check if we need to flush
if w.state.buf.Avail() <= szEventHeader {
if err := w.flushBuffer(); err != nil {
return w.errWrap(op, err)
}
}
return nil
}