in plugins/queue/mmap/queue_operation.go [160:184]
func (q *Queue) writeBytes(bytes []byte, id, offset int64) (newID, newOffset int64, err error) {
counter := 0
length := len(bytes)
for {
q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return 0, 0, err
}
writtenBytes, err := segment.WriteAt(bytes[counter:], offset)
q.unlock(id)
if err != nil {
return 0, 0, err
}
counter += writtenBytes
offset += int64(writtenBytes)
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
}
if counter == length {
break
}
}
return id, offset, nil
}