in plugins/queue/mmap/queue_operation.go [96:120]
func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
counter := 0
res := make([]byte, length)
for {
q.lock(id)
segment, err := q.GetSegment(id)
if err != nil {
return nil, 0, 0, err
}
readBytes, err := segment.ReadAt(res[counter:], offset)
q.unlock(id)
if err != nil {
return nil, 0, 0, err
}
counter += readBytes
offset += int64(readBytes)
if offset == int64(q.SegmentSize) {
id, offset = id+1, 0
}
if counter == length {
break
}
}
return res, id, offset, nil
}