in tsdb/head_wal.go [1063:1218]
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if h.wal == nil {
// If we are not storing any WAL, does not make sense to take a snapshot too.
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
return &ChunkSnapshotStats{}, nil
}
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
stats := &ChunkSnapshotStats{}
wlast, woffset, err := h.wal.LastSegmentAndOffset()
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "get last wal segment and offset")
}
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "find last chunk snapshot")
}
if wlast == cslast && woffset == csoffset {
// Nothing has been written to the WAL/Head since the last snapshot.
return stats, nil
}
snapshotName := chunkSnapshotDir(wlast, woffset)
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
cpdirtmp := cpdir + ".tmp"
stats.Dir = cpdir
if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
return stats, errors.Wrap(err, "create chunk snapshot dir")
}
cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
if err != nil {
return stats, errors.Wrap(err, "open chunk snapshot")
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
var (
buf []byte
recs [][]byte
)
// Add all series to the snapshot.
stripeSize := h.series.size
for i := 0; i < stripeSize; i++ {
h.series.locks[i].RLock()
for _, s := range h.series.series[i] {
start := len(buf)
buf = s.encodeToSnapshotRecord(buf)
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 10 MB increments.
if len(buf) > 10*1024*1024 {
if err := cp.Log(recs...); err != nil {
h.series.locks[i].RUnlock()
return stats, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
stats.TotalSeries += len(h.series.series[i])
h.series.locks[i].RUnlock()
}
// Add tombstones to the snapshot.
tombstonesReader, err := h.Tombstones()
if err != nil {
return stats, errors.Wrap(err, "get tombstones")
}
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
if err != nil {
return stats, errors.Wrap(err, "encode tombstones")
}
recs = append(recs, rec)
// Flush remaining series records and tombstones.
if err := cp.Log(recs...); err != nil {
return stats, errors.Wrap(err, "flush records")
}
buf = buf[:0]
// Add exemplars in the snapshot.
// We log in batches, with each record having upto 10000 exemplars.
// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
maxExemplarsPerRecord := 10000
batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
enc := record.Encoder{}
flushExemplars := func() error {
if len(batch) == 0 {
return nil
}
buf = buf[:0]
encbuf := encoding.Encbuf{B: buf}
encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
if err := cp.Log(encbuf.Get()); err != nil {
return errors.Wrap(err, "log exemplars")
}
buf, batch = buf[:0], batch[:0]
return nil
}
err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
if len(batch) >= maxExemplarsPerRecord {
if err := flushExemplars(); err != nil {
return errors.Wrap(err, "flush exemplars")
}
}
ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
if ms == nil {
// It is possible that exemplar refers to some old series. We discard such exemplars.
return nil
}
batch = append(batch, record.RefExemplar{
Ref: ms.ref,
T: e.Ts,
V: e.Value,
Labels: e.Labels,
})
return nil
})
if err != nil {
return stats, errors.Wrap(err, "iterate exemplars")
}
// Flush remaining exemplars.
if err := flushExemplars(); err != nil {
return stats, errors.Wrap(err, "flush exemplars at the end")
}
if err := cp.Close(); err != nil {
return stats, errors.Wrap(err, "close chunk snapshot")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return stats, errors.Wrap(err, "rename chunk snapshot directory")
}
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
// Leftover old chunk snapshots do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher chunk snapshot exists.
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
}
return stats, nil
}