func()

in tsdb/head_wal.go [1063:1218]


func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
	if h.wal == nil {
		// If we are not storing any WAL, does not make sense to take a snapshot too.
		level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
		return &ChunkSnapshotStats{}, nil
	}
	h.chunkSnapshotMtx.Lock()
	defer h.chunkSnapshotMtx.Unlock()

	stats := &ChunkSnapshotStats{}

	wlast, woffset, err := h.wal.LastSegmentAndOffset()
	if err != nil && err != record.ErrNotFound {
		return stats, errors.Wrap(err, "get last wal segment and offset")
	}

	_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
	if err != nil && err != record.ErrNotFound {
		return stats, errors.Wrap(err, "find last chunk snapshot")
	}

	if wlast == cslast && woffset == csoffset {
		// Nothing has been written to the WAL/Head since the last snapshot.
		return stats, nil
	}

	snapshotName := chunkSnapshotDir(wlast, woffset)

	cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
	cpdirtmp := cpdir + ".tmp"
	stats.Dir = cpdir

	if err := os.MkdirAll(cpdirtmp, 0o777); err != nil {
		return stats, errors.Wrap(err, "create chunk snapshot dir")
	}
	cp, err := wlog.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
	if err != nil {
		return stats, errors.Wrap(err, "open chunk snapshot")
	}

	// Ensures that an early return caused by an error doesn't leave any tmp files.
	defer func() {
		cp.Close()
		os.RemoveAll(cpdirtmp)
	}()

	var (
		buf  []byte
		recs [][]byte
	)
	// Add all series to the snapshot.
	stripeSize := h.series.size
	for i := 0; i < stripeSize; i++ {
		h.series.locks[i].RLock()

		for _, s := range h.series.series[i] {
			start := len(buf)
			buf = s.encodeToSnapshotRecord(buf)
			if len(buf[start:]) == 0 {
				continue // All contents discarded.
			}
			recs = append(recs, buf[start:])
			// Flush records in 10 MB increments.
			if len(buf) > 10*1024*1024 {
				if err := cp.Log(recs...); err != nil {
					h.series.locks[i].RUnlock()
					return stats, errors.Wrap(err, "flush records")
				}
				buf, recs = buf[:0], recs[:0]
			}
		}
		stats.TotalSeries += len(h.series.series[i])

		h.series.locks[i].RUnlock()
	}

	// Add tombstones to the snapshot.
	tombstonesReader, err := h.Tombstones()
	if err != nil {
		return stats, errors.Wrap(err, "get tombstones")
	}
	rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
	if err != nil {
		return stats, errors.Wrap(err, "encode tombstones")
	}
	recs = append(recs, rec)
	// Flush remaining series records and tombstones.
	if err := cp.Log(recs...); err != nil {
		return stats, errors.Wrap(err, "flush records")
	}
	buf = buf[:0]

	// Add exemplars in the snapshot.
	// We log in batches, with each record having upto 10000 exemplars.
	// Assuming 100 bytes (overestimate) per exemplar, that's ~1MB.
	maxExemplarsPerRecord := 10000
	batch := make([]record.RefExemplar, 0, maxExemplarsPerRecord)
	enc := record.Encoder{}
	flushExemplars := func() error {
		if len(batch) == 0 {
			return nil
		}
		buf = buf[:0]
		encbuf := encoding.Encbuf{B: buf}
		encbuf.PutByte(chunkSnapshotRecordTypeExemplars)
		enc.EncodeExemplarsIntoBuffer(batch, &encbuf)
		if err := cp.Log(encbuf.Get()); err != nil {
			return errors.Wrap(err, "log exemplars")
		}
		buf, batch = buf[:0], batch[:0]
		return nil
	}
	err = h.exemplars.IterateExemplars(func(seriesLabels labels.Labels, e exemplar.Exemplar) error {
		if len(batch) >= maxExemplarsPerRecord {
			if err := flushExemplars(); err != nil {
				return errors.Wrap(err, "flush exemplars")
			}
		}

		ms := h.series.getByHash(seriesLabels.Hash(), seriesLabels)
		if ms == nil {
			// It is possible that exemplar refers to some old series. We discard such exemplars.
			return nil
		}
		batch = append(batch, record.RefExemplar{
			Ref:    ms.ref,
			T:      e.Ts,
			V:      e.Value,
			Labels: e.Labels,
		})
		return nil
	})
	if err != nil {
		return stats, errors.Wrap(err, "iterate exemplars")
	}

	// Flush remaining exemplars.
	if err := flushExemplars(); err != nil {
		return stats, errors.Wrap(err, "flush exemplars at the end")
	}

	if err := cp.Close(); err != nil {
		return stats, errors.Wrap(err, "close chunk snapshot")
	}
	if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
		return stats, errors.Wrap(err, "rename chunk snapshot directory")
	}

	if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, wlast, woffset); err != nil {
		// Leftover old chunk snapshots do not cause problems down the line beyond
		// occupying disk space.
		// They will just be ignored since a higher chunk snapshot exists.
		level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
	}
	return stats, nil
}