func()

in tsdb/agent/db.go [416:602]


func (db *DB) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef) (err error) {
	var (
		dec     record.Decoder
		lastRef = chunks.HeadSeriesRef(db.nextRef.Load())

		decoded    = make(chan interface{}, 10)
		errCh      = make(chan error, 1)
		seriesPool = sync.Pool{
			New: func() interface{} {
				return []record.RefSeries{}
			},
		}
		samplesPool = sync.Pool{
			New: func() interface{} {
				return []record.RefSample{}
			},
		}
		histogramsPool = sync.Pool{
			New: func() interface{} {
				return []record.RefHistogramSample{}
			},
		}
		floatHistogramsPool = sync.Pool{
			New: func() interface{} {
				return []record.RefFloatHistogramSample{}
			},
		}
	)

	go func() {
		defer close(decoded)
		var err error
		for r.Next() {
			rec := r.Record()
			switch dec.Type(rec) {
			case record.Series:
				series := seriesPool.Get().([]record.RefSeries)[:0]
				series, err = dec.Series(rec, series)
				if err != nil {
					errCh <- &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode series"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- series
			case record.Samples:
				samples := samplesPool.Get().([]record.RefSample)[:0]
				samples, err = dec.Samples(rec, samples)
				if err != nil {
					errCh <- &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode samples"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- samples
			case record.HistogramSamples:
				histograms := histogramsPool.Get().([]record.RefHistogramSample)[:0]
				histograms, err = dec.HistogramSamples(rec, histograms)
				if err != nil {
					errCh <- &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode histogram samples"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- histograms
			case record.FloatHistogramSamples:
				floatHistograms := floatHistogramsPool.Get().([]record.RefFloatHistogramSample)[:0]
				floatHistograms, err = dec.FloatHistogramSamples(rec, floatHistograms)
				if err != nil {
					errCh <- &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode float histogram samples"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- floatHistograms
			case record.Tombstones, record.Exemplars:
				// We don't care about tombstones or exemplars during replay.
				// TODO: If decide to decode exemplars, we should make sure to prepopulate
				// stripeSeries.exemplars in the next block by using setLatestExemplar.
				continue
			default:
				errCh <- &wlog.CorruptionErr{
					Err:     errors.Errorf("invalid record type %v", dec.Type(rec)),
					Segment: r.Segment(),
					Offset:  r.Offset(),
				}
			}
		}
	}()

	var nonExistentSeriesRefs atomic.Uint64

	for d := range decoded {
		switch v := d.(type) {
		case []record.RefSeries:
			for _, entry := range v {
				// If this is a new series, create it in memory. If we never read in a
				// sample for this series, its timestamp will remain at 0 and it will
				// be deleted at the next GC.
				if db.series.GetByID(entry.Ref) == nil {
					series := &memSeries{ref: entry.Ref, lset: entry.Labels, lastTs: 0}
					db.series.Set(entry.Labels.Hash(), series)
					multiRef[entry.Ref] = series.ref
					db.metrics.numActiveSeries.Inc()
					if entry.Ref > lastRef {
						lastRef = entry.Ref
					}
				}
			}

			//nolint:staticcheck
			seriesPool.Put(v)
		case []record.RefSample:
			for _, entry := range v {
				// Update the lastTs for the series based
				ref, ok := multiRef[entry.Ref]
				if !ok {
					nonExistentSeriesRefs.Inc()
					continue
				}
				series := db.series.GetByID(ref)
				if entry.T > series.lastTs {
					series.lastTs = entry.T
				}
			}

			//nolint:staticcheck
			samplesPool.Put(v)
		case []record.RefHistogramSample:
			for _, entry := range v {
				// Update the lastTs for the series based
				ref, ok := multiRef[entry.Ref]
				if !ok {
					nonExistentSeriesRefs.Inc()
					continue
				}
				series := db.series.GetByID(ref)
				if entry.T > series.lastTs {
					series.lastTs = entry.T
				}
			}
			//nolint:staticcheck
			histogramsPool.Put(v)
		case []record.RefFloatHistogramSample:
			for _, entry := range v {
				// Update the lastTs for the series based
				ref, ok := multiRef[entry.Ref]
				if !ok {
					nonExistentSeriesRefs.Inc()
					continue
				}
				series := db.series.GetByID(ref)
				if entry.T > series.lastTs {
					series.lastTs = entry.T
				}
			}
			//nolint:staticcheck
			floatHistogramsPool.Put(v)
		default:
			panic(fmt.Errorf("unexpected decoded type: %T", d))
		}
	}

	if v := nonExistentSeriesRefs.Load(); v > 0 {
		level.Warn(db.logger).Log("msg", "found sample referencing non-existing series", "skipped_series", v)
	}

	db.nextRef.Store(uint64(lastRef))

	select {
	case err := <-errCh:
		return err
	default:
		if r.Err() != nil {
			return errors.Wrap(r.Err(), "read records")
		}
		return nil
	}
}