func()

in tsdb/head_wal.go [647:827]


func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
	// Track number of samples, m-map markers, that referenced a series we don't know about
	// for error reporting.
	var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64

	lastSeq, lastOff := lastMmapRef.Unpack()
	// Start workers that each process samples for a partition of the series ID space.
	var (
		wg          sync.WaitGroup
		concurrency = h.opts.WALReplayConcurrency
		processors  = make([]wblSubsetProcessor, concurrency)

		dec    record.Decoder
		shards = make([][]record.RefSample, concurrency)

		decodedCh   = make(chan interface{}, 10)
		decodeErr   error
		samplesPool = sync.Pool{
			New: func() interface{} {
				return []record.RefSample{}
			},
		}
		markersPool = sync.Pool{
			New: func() interface{} {
				return []record.RefMmapMarker{}
			},
		}
	)

	defer func() {
		// For CorruptionErr ensure to terminate all workers before exiting.
		// We also wrap it to identify OOO WBL corruption.
		_, ok := err.(*wlog.CorruptionErr)
		if ok {
			err = &errLoadWbl{err: err}
			for i := 0; i < concurrency; i++ {
				processors[i].closeAndDrain()
			}
			wg.Wait()
		}
	}()

	wg.Add(concurrency)
	for i := 0; i < concurrency; i++ {
		processors[i].setup()

		go func(wp *wblSubsetProcessor) {
			unknown := wp.processWBLSamples(h)
			unknownRefs.Add(unknown)
			wg.Done()
		}(&processors[i])
	}

	go func() {
		defer close(decodedCh)
		for r.Next() {
			rec := r.Record()
			switch dec.Type(rec) {
			case record.Samples:
				samples := samplesPool.Get().([]record.RefSample)[:0]
				samples, err = dec.Samples(rec, samples)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode samples"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decodedCh <- samples
			case record.MmapMarkers:
				markers := markersPool.Get().([]record.RefMmapMarker)[:0]
				markers, err = dec.MmapMarkers(rec, markers)
				if err != nil {
					decodeErr = &wlog.CorruptionErr{
						Err:     errors.Wrap(err, "decode mmap markers"),
						Segment: r.Segment(),
						Offset:  r.Offset(),
					}
					return
				}
				decodedCh <- markers
			default:
				// Noop.
			}
		}
	}()

	// The records are always replayed from the oldest to the newest.
	for d := range decodedCh {
		switch v := d.(type) {
		case []record.RefSample:
			samples := v
			// We split up the samples into parts of 5000 samples or less.
			// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
			// cause thousands of very large in flight buffers occupying large amounts
			// of unused memory.
			for len(samples) > 0 {
				m := 5000
				if len(samples) < m {
					m = len(samples)
				}
				for i := 0; i < concurrency; i++ {
					shards[i] = processors[i].reuseBuf()
				}
				for _, sam := range samples[:m] {
					if r, ok := multiRef[sam.Ref]; ok {
						sam.Ref = r
					}
					mod := uint64(sam.Ref) % uint64(concurrency)
					shards[mod] = append(shards[mod], sam)
				}
				for i := 0; i < concurrency; i++ {
					processors[i].input <- shards[i]
				}
				samples = samples[m:]
			}
			samplesPool.Put(d)
		case []record.RefMmapMarker:
			markers := v
			for _, rm := range markers {
				seq, off := rm.MmapRef.Unpack()
				if seq > lastSeq || (seq == lastSeq && off > lastOff) {
					// This m-map chunk from markers was not present during
					// the load of mmapped chunks that happened in the head
					// initialization.
					continue
				}

				if r, ok := multiRef[rm.Ref]; ok {
					rm.Ref = r
				}

				ms := h.series.getByID(rm.Ref)
				if ms == nil {
					mmapMarkerUnknownRefs.Inc()
					continue
				}
				idx := uint64(ms.ref) % uint64(concurrency)
				// It is possible that some old sample is being processed in processWALSamples that
				// could cause race below. So we wait for the goroutine to empty input the buffer and finish
				// processing all old samples after emptying the buffer.
				processors[idx].waitUntilIdle()
				// Lock the subset so we can modify the series object
				processors[idx].mx.Lock()

				// All samples till now have been m-mapped. Hence clear out the headChunk.
				// In case some samples slipped through and went into m-map chunks because of changed
				// chunk size parameters, we are not taking care of that here.
				// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
				// the size of ooo chunk was reduced between restart.
				if ms.ooo != nil {
					ms.ooo.oooHeadChunk = nil
				}

				processors[idx].mx.Unlock()
			}
		default:
			panic(fmt.Errorf("unexpected decodedCh type: %T", d))
		}
	}

	if decodeErr != nil {
		return decodeErr
	}

	// Signal termination to each worker and wait for it to close its output channel.
	for i := 0; i < concurrency; i++ {
		processors[i].closeAndDrain()
	}
	wg.Wait()

	if r.Err() != nil {
		return errors.Wrap(r.Err(), "read records")
	}

	if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
		level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
	}
	return nil
}