in tsdb/head_wal.go [647:827]
func (h *Head) loadWBL(r *wlog.Reader, multiRef map[chunks.HeadSeriesRef]chunks.HeadSeriesRef, lastMmapRef chunks.ChunkDiskMapperRef) (err error) {
// Track number of samples, m-map markers, that referenced a series we don't know about
// for error reporting.
var unknownRefs, mmapMarkerUnknownRefs atomic.Uint64
lastSeq, lastOff := lastMmapRef.Unpack()
// Start workers that each process samples for a partition of the series ID space.
var (
wg sync.WaitGroup
concurrency = h.opts.WALReplayConcurrency
processors = make([]wblSubsetProcessor, concurrency)
dec record.Decoder
shards = make([][]record.RefSample, concurrency)
decodedCh = make(chan interface{}, 10)
decodeErr error
samplesPool = sync.Pool{
New: func() interface{} {
return []record.RefSample{}
},
}
markersPool = sync.Pool{
New: func() interface{} {
return []record.RefMmapMarker{}
},
}
)
defer func() {
// For CorruptionErr ensure to terminate all workers before exiting.
// We also wrap it to identify OOO WBL corruption.
_, ok := err.(*wlog.CorruptionErr)
if ok {
err = &errLoadWbl{err: err}
for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain()
}
wg.Wait()
}
}()
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
processors[i].setup()
go func(wp *wblSubsetProcessor) {
unknown := wp.processWBLSamples(h)
unknownRefs.Add(unknown)
wg.Done()
}(&processors[i])
}
go func() {
defer close(decodedCh)
for r.Next() {
rec := r.Record()
switch dec.Type(rec) {
case record.Samples:
samples := samplesPool.Get().([]record.RefSample)[:0]
samples, err = dec.Samples(rec, samples)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode samples"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- samples
case record.MmapMarkers:
markers := markersPool.Get().([]record.RefMmapMarker)[:0]
markers, err = dec.MmapMarkers(rec, markers)
if err != nil {
decodeErr = &wlog.CorruptionErr{
Err: errors.Wrap(err, "decode mmap markers"),
Segment: r.Segment(),
Offset: r.Offset(),
}
return
}
decodedCh <- markers
default:
// Noop.
}
}
}()
// The records are always replayed from the oldest to the newest.
for d := range decodedCh {
switch v := d.(type) {
case []record.RefSample:
samples := v
// We split up the samples into parts of 5000 samples or less.
// With O(300 * #cores) in-flight sample batches, large scrapes could otherwise
// cause thousands of very large in flight buffers occupying large amounts
// of unused memory.
for len(samples) > 0 {
m := 5000
if len(samples) < m {
m = len(samples)
}
for i := 0; i < concurrency; i++ {
shards[i] = processors[i].reuseBuf()
}
for _, sam := range samples[:m] {
if r, ok := multiRef[sam.Ref]; ok {
sam.Ref = r
}
mod := uint64(sam.Ref) % uint64(concurrency)
shards[mod] = append(shards[mod], sam)
}
for i := 0; i < concurrency; i++ {
processors[i].input <- shards[i]
}
samples = samples[m:]
}
samplesPool.Put(d)
case []record.RefMmapMarker:
markers := v
for _, rm := range markers {
seq, off := rm.MmapRef.Unpack()
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
// This m-map chunk from markers was not present during
// the load of mmapped chunks that happened in the head
// initialization.
continue
}
if r, ok := multiRef[rm.Ref]; ok {
rm.Ref = r
}
ms := h.series.getByID(rm.Ref)
if ms == nil {
mmapMarkerUnknownRefs.Inc()
continue
}
idx := uint64(ms.ref) % uint64(concurrency)
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
processors[idx].waitUntilIdle()
// Lock the subset so we can modify the series object
processors[idx].mx.Lock()
// All samples till now have been m-mapped. Hence clear out the headChunk.
// In case some samples slipped through and went into m-map chunks because of changed
// chunk size parameters, we are not taking care of that here.
// TODO(codesome): see if there is a way to avoid duplicate m-map chunks if
// the size of ooo chunk was reduced between restart.
if ms.ooo != nil {
ms.ooo.oooHeadChunk = nil
}
processors[idx].mx.Unlock()
}
default:
panic(fmt.Errorf("unexpected decodedCh type: %T", d))
}
}
if decodeErr != nil {
return decodeErr
}
// Signal termination to each worker and wait for it to close its output channel.
for i := 0; i < concurrency; i++ {
processors[i].closeAndDrain()
}
wg.Wait()
if r.Err() != nil {
return errors.Wrap(r.Err(), "read records")
}
if unknownRefs.Load() > 0 || mmapMarkerUnknownRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references for ooo WAL replay", "samples", unknownRefs.Load(), "mmap_markers", mmapMarkerUnknownRefs.Load())
}
return nil
}