in tsdb/wlog/watcher.go [534:664]
func (w *Watcher) readSegment(r *LiveReader, segmentNum int, tail bool) error {
var (
dec record.Decoder
series []record.RefSeries
samples []record.RefSample
samplesToSend []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
histogramsToSend []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
floatHistogramsToSend []record.RefFloatHistogramSample
)
for r.Next() && !isClosed(w.quit) {
rec := r.Record()
w.recordsReadMetric.WithLabelValues(dec.Type(rec).String()).Inc()
switch dec.Type(rec) {
case record.Series:
series, err := dec.Series(rec, series[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.StoreSeries(series, segmentNum)
case record.Samples:
// If we're not tailing a segment we can ignore any samples records we see.
// This speeds up replay of the WAL by > 10x.
if !tail {
break
}
samples, err := dec.Samples(rec, samples[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, s := range samples {
if s.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
samplesToSend = append(samplesToSend, s)
}
}
if len(samplesToSend) > 0 {
w.writer.Append(samplesToSend)
samplesToSend = samplesToSend[:0]
}
case record.Exemplars:
// Skip if experimental "exemplars over remote write" is not enabled.
if !w.sendExemplars {
break
}
// If we're not tailing a segment we can ignore any exemplars records we see.
// This speeds up replay of the WAL significantly.
if !tail {
break
}
exemplars, err := dec.Exemplars(rec, exemplars[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
w.writer.AppendExemplars(exemplars)
case record.HistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
histograms, err := dec.HistogramSamples(rec, histograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, h := range histograms {
if h.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
histogramsToSend = append(histogramsToSend, h)
}
}
if len(histogramsToSend) > 0 {
w.writer.AppendHistograms(histogramsToSend)
histogramsToSend = histogramsToSend[:0]
}
case record.FloatHistogramSamples:
// Skip if experimental "histograms over remote write" is not enabled.
if !w.sendHistograms {
break
}
if !tail {
break
}
floatHistograms, err := dec.FloatHistogramSamples(rec, floatHistograms[:0])
if err != nil {
w.recordDecodeFailsMetric.Inc()
return err
}
for _, fh := range floatHistograms {
if fh.T > w.startTimestamp {
if !w.sendSamples {
w.sendSamples = true
duration := time.Since(w.startTime)
level.Info(w.logger).Log("msg", "Done replaying WAL", "duration", duration)
}
floatHistogramsToSend = append(floatHistogramsToSend, fh)
}
}
if len(floatHistogramsToSend) > 0 {
w.writer.AppendFloatHistograms(floatHistogramsToSend)
floatHistogramsToSend = floatHistogramsToSend[:0]
}
case record.Tombstones:
default:
// Could be corruption, or reading from a WAL from a newer Prometheus.
w.recordDecodeFailsMetric.Inc()
}
}
return errors.Wrapf(r.Err(), "segment %d: %v", segmentNum, r.Err())
}