in pq/writer.go [64:119]
func newWriter(
accessor *access,
off uintptr,
o Observer,
pagePool *pagePool,
writeBuffer uint,
end position,
flushCB func(uint),
) (*Writer, reason) {
const op = "pq/create-writer"
pageSize := accessor.PageSize()
if pageSize <= 0 {
return nil, &Error{op: op, kind: InvalidPageSize}
}
pages := int(writeBuffer) / pageSize
if pages <= defaultMinPages {
pages = defaultMinPages
}
tracef("create queue writer with initBufferSize=%v, actualBufferSize=%v, pageSize=%v, pages=%v\n",
writeBuffer, pageSize*pages, pageSize, pages)
var tail *page
if end.page != 0 {
traceln("writer load endpage: ", end)
page := end.page
off := end.off
var err reason
tail, err = accessor.readPageByID(pagePool, page)
if err != nil {
return nil, (&Error{op: op}).causedBy(err)
}
tail.Meta.EndOff = uint32(off)
}
w := &Writer{
active: true,
hdrOffset: off,
observer: o,
accessor: accessor,
state: writeState{
buf: newBuffer(pagePool, tail, pages, pageSize, szEventPageHeader),
eventID: end.id,
},
flushCB: flushCB,
}
// init buffer with 'first' event to be written
w.state.buf.ReserveHdr(szEventHeader)
return w, nil
}