func()

in v1storage/persistence.go [482:532]


func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk.Chunk, error) {
	f, err := p.openChunkFileForReading(fp)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	chunks := make([]chunk.Chunk, 0, len(indexes))
	buf := p.bufPool.Get().([]byte)
	defer func() {
		// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
		// would only put back the original buf.
		p.bufPool.Put(buf)
	}()

	for i := 0; i < len(indexes); i++ {
		// This loads chunks in batches. A batch is a streak of
		// consecutive chunks, read from disk in one go.
		batchSize := 1
		if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), io.SeekStart); err != nil {
			return nil, err
		}

		for ; batchSize < chunkMaxBatchSize &&
			i+1 < len(indexes) &&
			indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
		}
		readSize := batchSize * chunkLenWithHeader
		if cap(buf) < readSize {
			buf = make([]byte, readSize)
		}
		buf = buf[:readSize]

		if _, err := io.ReadFull(f, buf); err != nil {
			return nil, err
		}
		for c := 0; c < batchSize; c++ {
			chunk, err := chunk.NewForEncoding(chunk.Encoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
			if err != nil {
				return nil, err
			}
			if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
				return nil, err
			}
			chunks = append(chunks, chunk)
		}
	}
	chunk.Ops.WithLabelValues(chunk.Load).Add(float64(len(chunks)))
	atomic.AddInt64(&chunk.NumMemChunks, int64(len(chunks)))
	return chunks, nil
}