in v1storage/persistence.go [482:532]
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
f, err := p.openChunkFileForReading(fp)
if err != nil {
return nil, err
}
defer f.Close()
chunks := make([]chunk.Chunk, 0, len(indexes))
buf := p.bufPool.Get().([]byte)
defer func() {
// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
// would only put back the original buf.
p.bufPool.Put(buf)
}()
for i := 0; i < len(indexes); i++ {
// This loads chunks in batches. A batch is a streak of
// consecutive chunks, read from disk in one go.
batchSize := 1
if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), io.SeekStart); err != nil {
return nil, err
}
for ; batchSize < chunkMaxBatchSize &&
i+1 < len(indexes) &&
indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
}
readSize := batchSize * chunkLenWithHeader
if cap(buf) < readSize {
buf = make([]byte, readSize)
}
buf = buf[:readSize]
if _, err := io.ReadFull(f, buf); err != nil {
return nil, err
}
for c := 0; c < batchSize; c++ {
chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
if err != nil {
return nil, err
}
if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
return nil, err
}
chunks = append(chunks, chunk)
}
}
chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
return chunks, nil
}