func()

in plugins/queue/mmap/queue_operation.go [95:119]


func (q *Queue) readBytes(id, offset int64, length int) (data []byte, newID, newOffset int64, err error) {
	counter := 0
	res := make([]byte, length)
	for {
		q.lock(id)
		segment, err := q.GetSegment(id)
		if err != nil {
			return nil, 0, 0, err
		}
		readBytes, err := segment.ReadAt(res[counter:], offset)
		q.unlock(id)
		if err != nil {
			return nil, 0, 0, err
		}
		counter += readBytes
		offset += int64(readBytes)
		if offset == int64(q.SegmentSize) {
			id, offset = id+1, 0
		}
		if counter == length {
			break
		}
	}
	return res, id, offset, nil
}