in tsdb/compact.go [675:821]
func (c DefaultBlockPopulator) PopulateBlock(ctx context.Context, metrics *CompactorMetrics, logger log.Logger, chunkPool chunkenc.Pool, mergeFunc storage.VerticalChunkSeriesMergeFunc, blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}
var (
sets []storage.ChunkSeriesSet
symbols index.StringIter
closers []io.Closer
overlapping bool
)
defer func() {
errs := tsdb_errors.NewMulti(err)
if cerr := tsdb_errors.CloseAll(closers); cerr != nil {
errs.Add(errors.Wrap(cerr, "close"))
}
err = errs.Err()
metrics.PopulatingBlocks.Set(0)
}()
metrics.PopulatingBlocks.Set(1)
globalMaxt := blocks[0].Meta().MaxTime
for i, b := range blocks {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if !overlapping {
if i > 0 && b.Meta().MinTime < globalMaxt {
metrics.OverlappingBlocks.Inc()
overlapping = true
level.Info(logger).Log("msg", "Found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.Meta().MaxTime > globalMaxt {
globalMaxt = b.Meta().MaxTime
}
}
indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %+v", b.Meta())
}
closers = append(closers, indexr)
chunkr, err := b.Chunks()
if err != nil {
return errors.Wrapf(err, "open chunk reader for block %+v", b.Meta())
}
closers = append(closers, chunkr)
tombsr, err := b.Tombstones()
if err != nil {
return errors.Wrapf(err, "open tombstone reader for block %+v", b.Meta())
}
closers = append(closers, tombsr)
k, v := index.AllPostingsKey()
all, err := indexr.Postings(k, v)
if err != nil {
return err
}
all = indexr.SortedPostings(all)
// Blocks meta is half open: [min, max), so subtract 1 to ensure we don't hold samples with exact meta.MaxTime timestamp.
sets = append(sets, NewBlockChunkSeriesSet(b.Meta().ULID, indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime-1, false))
syms := indexr.Symbols()
if i == 0 {
symbols = syms
continue
}
symbols = NewMergedStringIter(symbols, syms)
}
for symbols.Next() {
if err := indexw.AddSymbol(symbols.At()); err != nil {
return errors.Wrap(err, "add symbol")
}
}
if symbols.Err() != nil {
return errors.Wrap(symbols.Err(), "next symbol")
}
var (
ref = storage.SeriesRef(0)
chks []chunks.Meta
chksIter chunks.Iterator
)
set := sets[0]
if len(sets) > 1 {
// Merge series using specified chunk series merger.
// The default one is the compacting series merger.
set = storage.NewMergeChunkSeriesSet(sets, mergeFunc)
}
// Iterate over all sorted chunk series.
for set.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
s := set.At()
chksIter = s.Iterator(chksIter)
chks = chks[:0]
for chksIter.Next() {
// We are not iterating in streaming way over chunk as
// it's more efficient to do bulk write for index and
// chunk file purposes.
chks = append(chks, chksIter.At())
}
if chksIter.Err() != nil {
return errors.Wrap(chksIter.Err(), "chunk iter")
}
// Skip the series with all deleted chunks.
if len(chks) == 0 {
continue
}
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(ref, s.Labels(), chks...); err != nil {
return errors.Wrap(err, "add series")
}
meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumSeries++
for _, chk := range chks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range chks {
if err := chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
}
ref++
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
}
return nil
}