in pq/writer.go [221:267]
func (w *Writer) flushBuffer() error {
activeEventCount := w.state.activeEventCount
start := time.Now()
pages, allocated, err := w.doFlush()
if o := w.observer; o != nil {
failed := err != nil
o.OnQueueFlush(w.hdrOffset, FlushStats{
Duration: time.Since(start),
Oldest: w.state.tsOldest,
Newest: w.state.tsNewest,
Failed: failed,
OutOfMemory: failed && (txerr.Is(txfile.OutOfMemory, err) ||
txerr.Is(txfile.NoDiskSpace, err)),
Pages: pages,
Allocate: allocated,
Events: activeEventCount,
BytesTotal: w.state.activeEventBytes,
BytesMin: w.state.minEventSize,
BytesMax: w.state.maxEventSize,
})
}
if err != nil {
return err
}
// reset internal stats on success
w.state.totalEventCount += activeEventCount
w.state.totalAllocPages += allocated
traceln("Write buffer flushed. Total events: %v, total pages allocated: %v",
w.state.totalEventCount,
w.state.totalAllocPages)
w.state.activeEventCount = 0
w.state.activeEventBytes = 0
w.state.minEventSize = 0
w.state.maxEventSize = 0
if w.flushCB != nil {
w.flushCB(activeEventCount)
}
return nil
}