func()

in tsdb/head_wal.go [56:437]


func (h *Head) loadWAL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, mmappedChunks, oooMmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (err error) {
	// Track number of samples that referenced a series we don't know about
	// for error reporting.
	var unknownRefs atomic.Uint64
	var unknownExemplarRefs atomic.Uint64
	var unknownHistogramRefs atomic.Uint64
	var unknownMetadataRefs atomic.Uint64
	// Track number of series records that had overlapping m-map chunks.
	var mmapOverlappingChunks atomic.Uint64

	// Start workers that each process samples for a partition of the series ID space.
	var (
		wg             sync.WaitGroup
		concurrency    = h.opts.WALReplayConcurrency
		processors     = make([]walSubsetProcessor, concurrency)
		exemplarsInput chan record.RefExemplar

		dec             record.Decoder
		shards          = make([][]record.RefSample, concurrency)
		histogramShards = make([][]histogramRecord, concurrency)

		decoded                      = make(chan interface{}, 10)
		decodeErr, seriesCreationErr error

		seriesPool          zeropool.Pool[[]record.RefSeries]
		samplesPool         zeropool.Pool[[]record.RefSample]
		tstonesPool         zeropool.Pool[[]tombstones.Stone]
		exemplarsPool       zeropool.Pool[[]record.RefExemplar]
		histogramsPool      zeropool.Pool[[]record.RefHistogramSample]
		floatHistogramsPool zeropool.Pool[[]record.RefFloatHistogramSample]
		metadataPool        zeropool.Pool[[]record.RefMetadata]
	)

	defer func() {
		// For CorruptionErr ensure to terminate all workers before exiting.
		_, ok := err.(*wlog.CorruptionErr)
		if ok || seriesCreationErr != nil {
			for i := 0; i < concurrency; i++ {
				processors[i].closeAndDrain()
			}
			close(exemplarsInput)
			wg.Wait()
		}
	}()

	wg.Add(concurrency)
	for i := 0; i < concurrency; i++ {
		processors[i].setup()

		go func(wp *walSubsetProcessor) {
			unknown, unknownHistograms, overlapping := wp.processWALSamples(h, mmappedChunks, oooMmappedChunks)
			unknownRefs.Add(unknown)
			mmapOverlappingChunks.Add(overlapping)
			unknownHistogramRefs.Add(unknownHistograms)
			wg.Done()
		}(&processors[i])
	}

	wg.Add(1)
	exemplarsInput = make(chan record.RefExemplar, 300)
	go func(input <-chan record.RefExemplar) {
		var err error
		defer wg.Done()
		for e := range input {
			ms := h.series.getByID(e.Ref)
			if ms == nil {
				unknownExemplarRefs.Inc()
				continue
			}

			if e.T < h.minValidTime.Load() {
				continue
			}
			// At the moment the only possible error here is out of order exemplars, which we shouldn't see when
			// replaying the WAL, so lets just log the error if it's not that type.
			err = h.exemplars.AddExemplar(ms.lset, exemplar.Exemplar{Ts: e.T, Value: e.V, Labels: e.Labels})
			if err != nil && err == storage.ErrOutOfOrderExemplar {
				level.Warn(h.logger).Log("msg", "Unexpected error when replaying WAL on exemplar record", "err", err)
			}
		}
	}(exemplarsInput)

	go func() {
		defer close(decoded)
		var err error
		for r.Next() {
			rec := r.Record()
			switch dec.Type(rec) {
			case record.Series:
				series := seriesPool.Get()[:0]
				series, err = dec.Series(rec, series)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode series"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- series
			case record.Samples:
				samples := samplesPool.Get()[:0]
				samples, err = dec.Samples(rec, samples)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode samples"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- samples
			case record.Tombstones:
				tstones := tstonesPool.Get()[:0]
				tstones, err = dec.Tombstones(rec, tstones)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode tombstones"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- tstones
			case record.Exemplars:
				exemplars := exemplarsPool.Get()[:0]
				exemplars, err = dec.Exemplars(rec, exemplars)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode exemplars"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- exemplars
			case record.HistogramSamples:
				hists := histogramsPool.Get()[:0]
				hists, err = dec.HistogramSamples(rec, hists)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode histograms"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- hists
			case record.FloatHistogramSamples:
				hists := floatHistogramsPool.Get()[:0]
				hists, err = dec.FloatHistogramSamples(rec, hists)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode float histograms"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- hists
			case record.Metadata:
				meta := metadataPool.Get()[:0]
				meta, err := dec.Metadata(rec, meta)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode metadata"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decoded <- meta
			default:
				// Noop.
			}
		}
	}()

	// The records are always replayed from the oldest to the newest.
Outer:
	for d := range decoded {
		switch v := d.(type) {
		case []record.RefSeries:
			for _, walSeries := range v {
				mSeries, created, err := h.getOrCreateWithID(walSeries.Ref, walSeries.Labels.Hash(), walSeries.Labels)
				if err != nil {
					seriesCreationErr = err
					break Outer
				}

				if chunks.HeadSeriesRef(h.lastSeriesID.Load()) < walSeries.Ref {
					h.lastSeriesID.Store(uint64(walSeries.Ref))
				}
				if !created {
					multiRef[walSeries.Ref] = mSeries.ref
				}

				idx := uint64(mSeries.ref) % uint64(concurrency)
				processors[idx].input <- walSubsetProcessorInputItem{walSeriesRef: walSeries.Ref, existingSeries: mSeries}
			}
			seriesPool.Put(v)
		case []record.RefSample:
			samples := v
			minValidTime := h.minValidTime.Load()
			// We split up the samples into chunks of 5000 samples or less.
			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
			// cause thousands of very large in flight buffers occupying large amounts
			// of unused memory.
			for len(samples) > 0 {
				m := 5000
				if len(samples) < m {
					m = len(samples)
				}
				for i := 0; i < concurrency; i++ {
					if shards[i] == nil {
						shards[i] = processors[i].reuseBuf()
					}
				}
				for _, sam := range samples[:m] {
					if sam.T < minValidTime {
						continue // Before minValidTime: discard.
					}
					if r, ok := multiRef[sam.Ref]; ok {
						sam.Ref = r
					}
					mod := uint64(sam.Ref) % uint64(concurrency)
					shards[mod] = append(shards[mod], sam)
				}
				for i := 0; i < concurrency; i++ {
					if len(shards[i]) > 0 {
						processors[i].input <- walSubsetProcessorInputItem{samples: shards[i]}
						shards[i] = nil
					}
				}
				samples = samples[m:]
			}
			samplesPool.Put(v)
		case []tombstones.Stone:
			for _, s := range v {
				for _, itv := range s.Intervals {
					if itv.Maxt < h.minValidTime.Load() {
						continue
					}
					if m := h.series.getByID(chunks.HeadSeriesRef(s.Ref)); m == nil {
						unknownRefs.Inc()
						continue
					}
					h.tombstones.AddInterval(s.Ref, itv)
				}
			}
			tstonesPool.Put(v)
		case []record.RefExemplar:
			for _, e := range v {
				exemplarsInput <- e
			}
			exemplarsPool.Put(v)
		case []record.RefHistogramSample:
			samples := v
			minValidTime := h.minValidTime.Load()
			// We split up the samples into chunks of 5000 samples or less.
			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
			// cause thousands of very large in flight buffers occupying large amounts
			// of unused memory.
			for len(samples) > 0 {
				m := 5000
				if len(samples) < m {
					m = len(samples)
				}
				for i := 0; i < concurrency; i++ {
					if histogramShards[i] == nil {
						histogramShards[i] = processors[i].reuseHistogramBuf()
					}
				}
				for _, sam := range samples[:m] {
					if sam.T < minValidTime {
						continue // Before minValidTime: discard.
					}
					if r, ok := multiRef[sam.Ref]; ok {
						sam.Ref = r
					}
					mod := uint64(sam.Ref) % uint64(concurrency)
					histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, h: sam.H})
				}
				for i := 0; i < concurrency; i++ {
					if len(histogramShards[i]) > 0 {
						processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
						histogramShards[i] = nil
					}
				}
				samples = samples[m:]
			}
			histogramsPool.Put(v)
		case []record.RefFloatHistogramSample:
			samples := v
			minValidTime := h.minValidTime.Load()
			// We split up the samples into chunks of 5000 samples or less.
			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
			// cause thousands of very large in flight buffers occupying large amounts
			// of unused memory.
			for len(samples) > 0 {
				m := 5000
				if len(samples) < m {
					m = len(samples)
				}
				for i := 0; i < concurrency; i++ {
					if histogramShards[i] == nil {
						histogramShards[i] = processors[i].reuseHistogramBuf()
					}
				}
				for _, sam := range samples[:m] {
					if sam.T < minValidTime {
						continue // Before minValidTime: discard.
					}
					if r, ok := multiRef[sam.Ref]; ok {
						sam.Ref = r
					}
					mod := uint64(sam.Ref) % uint64(concurrency)
					histogramShards[mod] = append(histogramShards[mod], histogramRecord{ref: sam.Ref, t: sam.T, fh: sam.FH})
				}
				for i := 0; i < concurrency; i++ {
					if len(histogramShards[i]) > 0 {
						processors[i].input <- walSubsetProcessorInputItem{histogramSamples: histogramShards[i]}
						histogramShards[i] = nil
					}
				}
				samples = samples[m:]
			}
			floatHistogramsPool.Put(v)
		case []record.RefMetadata:
			for _, m := range v {
				s := h.series.getByID(m.Ref)
				if s == nil {
					unknownMetadataRefs.Inc()
					continue
				}
				s.meta = &metadata.Metadata{
					Type: record.ToTextparseMetricType(m.Type),
					Unit: m.Unit,
					Help: m.Help,
				}
			}
			metadataPool.Put(v)
		default:
			panic(fmt.Errorf("unexpected decoded type: %T", d))
		}
	}

	if decodeErr != nil {
		return decodeErr
	}
	if seriesCreationErr != nil {
		// Drain the channel to unblock the goroutine.
		for range decoded {
		}
		return seriesCreationErr
	}

	// Signal termination to each worker and wait for it to close its output channel.
	for i := 0; i < concurrency; i++ {
		processors[i].closeAndDrain()
	}
	close(exemplarsInput)
	wg.Wait()

	if r.Err() != nil {
		return errors.Wrap(r.Err(), "read records")
	}

	if unknownRefs.Load()+unknownExemplarRefs.Load()+unknownHistogramRefs.Load()+unknownMetadataRefs.Load() > 0 {
		level.Warn(h.logger).Log(
			"msg", "Unknown series references",
			"samples", unknownRefs.Load(),
			"exemplars", unknownExemplarRefs.Load(),
			"histograms", unknownHistogramRefs.Load(),
			"metadata", unknownMetadataRefs.Load(),
		)
	}
	if count := mmapOverlappingChunks.Load(); count > 0 {
		level.Info(h.logger).Log("msg", "Overlapping m-map chunks on duplicate series records", "count", count)
	}
	return nil
}