func()

in tsdb/head_wal.go [1327:1532]


func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSeries, error) {
	dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
	if err != nil {
		if err == record.ErrNotFound {
			return snapIdx, snapOffset, nil, nil
		}
		return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
	}

	start := time.Now()
	sr, err := wlog.NewSegmentsReader(dir)
	if err != nil {
		return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
	}
	defer func() {
		if err := sr.Close(); err != nil {
			level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
		}
	}()

	var (
		numSeries        = 0
		unknownRefs      = int64(0)
		concurrency      = h.opts.WALReplayConcurrency
		wg               sync.WaitGroup
		recordChan       = make(chan chunkSnapshotRecord, 5*concurrency)
		shardedRefSeries = make([]map[chunks.HeadSeriesRef]*memSeries, concurrency)
		errChan          = make(chan error, concurrency)
		refSeries        map[chunks.HeadSeriesRef]*memSeries
		exemplarBuf      []record.RefExemplar
		dec              record.Decoder
	)

	wg.Add(concurrency)
	for i := 0; i < concurrency; i++ {
		go func(idx int, rc <-chan chunkSnapshotRecord) {
			defer wg.Done()
			defer func() {
				// If there was an error, drain the channel
				// to unblock the main thread.
				for range rc {
				}
			}()

			shardedRefSeries[idx] = make(map[chunks.HeadSeriesRef]*memSeries)
			localRefSeries := shardedRefSeries[idx]

			for csr := range rc {
				series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
				if err != nil {
					errChan <- err
					return
				}
				localRefSeries[csr.ref] = series
				for {
					seriesID := uint64(series.ref)
					lastSeriesID := h.lastSeriesID.Load()
					if lastSeriesID >= seriesID || h.lastSeriesID.CompareAndSwap(lastSeriesID, seriesID) {
						break
					}
				}

				if csr.mc == nil {
					continue
				}
				series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
				series.headChunk = csr.mc
				series.lastValue = csr.lastValue

				app, err := series.headChunk.chunk.Appender()
				if err != nil {
					errChan <- err
					return
				}
				series.app = app

				h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
			}
		}(i, recordChan)
	}

	r := wlog.NewReader(sr)
	var loopErr error
Outer:
	for r.Next() {
		select {
		case err := <-errChan:
			errChan <- err
			break Outer
		default:
		}

		rec := r.Record()
		switch rec[0] {
		case chunkSnapshotRecordTypeSeries:
			numSeries++
			csr, err := decodeSeriesFromChunkSnapshot(&dec, rec)
			if err != nil {
				loopErr = errors.Wrap(err, "decode series record")
				break Outer
			}
			recordChan <- csr

		case chunkSnapshotRecordTypeTombstones:
			tr, err := decodeTombstonesSnapshotRecord(rec)
			if err != nil {
				loopErr = errors.Wrap(err, "decode tombstones")
				break Outer
			}

			if err = tr.Iter(func(ref storage.SeriesRef, ivs tombstones.Intervals) error {
				h.tombstones.AddInterval(ref, ivs...)
				return nil
			}); err != nil {
				loopErr = errors.Wrap(err, "iterate tombstones")
				break Outer
			}

		case chunkSnapshotRecordTypeExemplars:
			// Exemplars are at the end of snapshot. So all series are loaded at this point.
			if len(refSeries) == 0 {
				close(recordChan)
				wg.Wait()

				refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
				for _, shard := range shardedRefSeries {
					for k, v := range shard {
						refSeries[k] = v
					}
				}
			}

			if !h.opts.EnableExemplarStorage || h.opts.MaxExemplars.Load() <= 0 {
				// Exemplar storage is disabled.
				continue Outer
			}

			decbuf := encoding.Decbuf{B: rec[1:]}

			exemplarBuf = exemplarBuf[:0]
			exemplarBuf, err = dec.ExemplarsFromBuffer(&decbuf, exemplarBuf)
			if err != nil {
				loopErr = errors.Wrap(err, "exemplars from buffer")
				break Outer
			}

			for _, e := range exemplarBuf {
				ms, ok := refSeries[e.Ref]
				if !ok {
					unknownRefs++
					continue
				}

				if err := h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{
					Labels: e.Labels,
					Value:  e.V,
					Ts:     e.T,
				}); err != nil {
					loopErr = errors.Wrap(err, "add exemplar")
					break Outer
				}
			}

		default:
			// This is a record type we don't understand. It is either and old format from earlier versions,
			// or a new format and the code was rolled back to old version.
			loopErr = errors.Errorf("unsuported snapshot record type 0b%b", rec[0])
			break Outer
		}
	}
	if len(refSeries) == 0 {
		close(recordChan)
		wg.Wait()
	}

	close(errChan)
	merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
	for err := range errChan {
		merr.Add(errors.Wrap(err, "record processing"))
	}
	if err := merr.Err(); err != nil {
		return -1, -1, nil, err
	}

	if r.Err() != nil {
		return -1, -1, nil, errors.Wrap(r.Err(), "read records")
	}

	if len(refSeries) == 0 {
		// We had no exemplar record, so we have to build the map here.
		refSeries = make(map[chunks.HeadSeriesRef]*memSeries, numSeries)
		for _, shard := range shardedRefSeries {
			for k, v := range shard {
				refSeries[k] = v
			}
		}
	}

	elapsed := time.Since(start)
	level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
	if unknownRefs > 0 {
		level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
	}

	return snapIdx, snapOffset, refSeries, nil
}