func()

in tsdb/head_append.go [844:1115]


func (a *headAppender) Commit() (err error) {
	if a.closed {
		return ErrAppenderClosed
	}
	defer func() { a.closed = true }()

	if err := a.log(); err != nil {
		_ = a.Rollback() // Most likely the same error will happen again.
		return errors.Wrap(err, "write to WAL")
	}

	if a.head.writeNotified != nil {
		a.head.writeNotified.Notify()
	}

	// No errors logging to WAL, so pass the exemplars along to the in memory storage.
	exportExemplars := make(map[storage.SeriesRef]record.RefExemplar, len(a.exemplars))
	for _, e := range a.exemplars {
		s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
		// We don't instrument exemplar appends here, all is instrumented by storage.
		if err := a.head.exemplars.AddExemplar(s.lset, e.exemplar); err != nil {
			if err == storage.ErrOutOfOrderExemplar {
				continue
			}
			level.Debug(a.head.logger).Log("msg", "Unknown error while adding exemplar", "err", err)
		}
		exportExemplars[e.ref] = record.RefExemplar{
			Ref:    chunks.HeadSeriesRef(e.ref),
			T:      e.exemplar.Ts,
			V:      e.exemplar.Value,
			Labels: e.exemplar.Labels,
		}
	}

	defer a.head.metrics.activeAppenders.Dec()
	defer a.head.putAppendBuffer(a.samples)
	defer a.head.putSeriesBuffer(a.sampleSeries)
	defer a.head.putExemplarBuffer(a.exemplars)
	defer a.head.putHistogramBuffer(a.histograms)
	defer a.head.putFloatHistogramBuffer(a.floatHistograms)
	defer a.head.putMetadataBuffer(a.metadata)
	defer a.head.iso.closeAppend(a.appendID)

	var (
		samplesAppended = len(a.samples)
		oooAccepted     int   // number of samples out of order but accepted: with ooo enabled and within time window
		oooRejected     int   // number of samples rejected due to: out of order but OOO support disabled.
		tooOldRejected  int   // number of samples rejected due to: that are out of order but too old (OOO support enabled, but outside time window)
		oobRejected     int   // number of samples rejected due to: out of bounds: with t < minValidTime (OOO support disabled)
		inOrderMint     int64 = math.MaxInt64
		inOrderMaxt     int64 = math.MinInt64
		ooomint         int64 = math.MaxInt64
		ooomaxt         int64 = math.MinInt64
		wblSamples      []record.RefSample
		oooMmapMarkers  map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
		oooRecords      [][]byte
		oooCapMax       = a.head.opts.OutOfOrderCapMax.Load()
		chunkRange      = a.head.chunkRange.Load()
		series          *memSeries
		enc             record.Encoder
	)
	defer func() {
		for i := range oooRecords {
			a.head.putBytesBuffer(oooRecords[i][:0])
		}
	}()
	collectOOORecords := func() {
		if a.head.wbl == nil {
			// WBL is not enabled. So no need to collect.
			wblSamples = nil
			oooMmapMarkers = nil
			return
		}
		// The m-map happens before adding a new sample. So we collect
		// the m-map markers first, and then samples.
		// WBL Graphically:
		//   WBL Before this Commit(): [old samples before this commit for chunk 1]
		//   WBL After this Commit():  [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
		if oooMmapMarkers != nil {
			markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers))
			for ref, mmapRef := range oooMmapMarkers {
				markers = append(markers, record.RefMmapMarker{
					Ref:     ref,
					MmapRef: mmapRef,
				})
			}
			r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
			oooRecords = append(oooRecords, r)
		}

		if len(wblSamples) > 0 {
			r := enc.Samples(wblSamples, a.head.getBytesBuffer())
			oooRecords = append(oooRecords, r)
		}

		wblSamples = nil
		oooMmapMarkers = nil
	}
	for i, s := range a.samples {
		series = a.sampleSeries[i]
		series.Lock()

		oooSample, _, err := series.appendable(s.T, s.V, a.headMaxt, a.minValidTime, a.oooTimeWindow)
		switch err {
		case storage.ErrOutOfOrderSample:
			samplesAppended--
			oooRejected++
		case storage.ErrOutOfBounds:
			samplesAppended--
			oobRejected++
		case storage.ErrTooOldSample:
			samplesAppended--
			tooOldRejected++
		case nil:
			// Do nothing.
		default:
			samplesAppended--
		}

		var ok, chunkCreated bool

		switch {
		case err != nil:
			// Do nothing here.
		case oooSample:
			// Sample is OOO and OOO handling is enabled
			// and the delta is within the OOO tolerance.
			var mmapRef chunks.ChunkDiskMapperRef
			ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
			if chunkCreated {
				r, ok := oooMmapMarkers[series.ref]
				if !ok || r != 0 {
					// !ok means there are no markers collected for these samples yet. So we first flush the samples
					// before setting this m-map marker.

					// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
					// Hence, before we m-map again, we should add the samples and m-map markers
					// seen till now to the WBL records.
					collectOOORecords()
				}

				if oooMmapMarkers == nil {
					oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
				}
				oooMmapMarkers[series.ref] = mmapRef
			}
			if ok {
				wblSamples = append(wblSamples, s)
				if s.T < ooomint {
					ooomint = s.T
				}
				if s.T > ooomaxt {
					ooomaxt = s.T
				}
				oooAccepted++
			} else {
				// Sample is an exact duplicate of the last sample.
				// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
				// not with samples in already flushed OOO chunks.
				// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
				samplesAppended--
			}
		default:
			ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
			if ok {
				if s.T < inOrderMint {
					inOrderMint = s.T
				}
				if s.T > inOrderMaxt {
					inOrderMaxt = s.T
				}
			} else {
				// The sample is an exact duplicate, and should be silently dropped.
				samplesAppended--
			}
		}

		if chunkCreated {
			a.head.metrics.chunks.Inc()
			a.head.metrics.chunksCreated.Inc()
		}

		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
		series.pendingCommit = false
		series.Unlock()
	}

	histogramsTotal := len(a.histograms)
	histoOOORejected := 0
	for i, s := range a.histograms {
		series = a.histogramSeries[i]
		series.Lock()
		ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
		series.pendingCommit = false
		series.Unlock()

		if ok {
			if s.T < inOrderMint {
				inOrderMint = s.T
			}
			if s.T > inOrderMaxt {
				inOrderMaxt = s.T
			}
		} else {
			histogramsTotal--
			histoOOORejected++
		}
		if chunkCreated {
			a.head.metrics.chunks.Inc()
			a.head.metrics.chunksCreated.Inc()
		}
	}

	histogramsTotal += len(a.floatHistograms)
	for i, s := range a.floatHistograms {
		series = a.floatHistogramSeries[i]
		series.Lock()
		ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
		series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
		series.pendingCommit = false
		series.Unlock()

		if ok {
			if s.T < inOrderMint {
				inOrderMint = s.T
			}
			if s.T > inOrderMaxt {
				inOrderMaxt = s.T
			}
		} else {
			histogramsTotal--
			histoOOORejected++
		}
		if chunkCreated {
			a.head.metrics.chunks.Inc()
			a.head.metrics.chunksCreated.Inc()
		}
	}

	for i, m := range a.metadata {
		series = a.metadataSeries[i]
		series.Lock()
		series.meta = &metadata.Metadata{Type: record.ToTextparseMetricType(m.Type), Unit: m.Unit, Help: m.Help}
		series.Unlock()
	}

	a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oooRejected))
	a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histoOOORejected))
	a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(oobRejected))
	a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricTypeFloat).Add(float64(tooOldRejected))
	a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(samplesAppended))
	a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsTotal))
	a.head.metrics.outOfOrderSamplesAppended.Add(float64(oooAccepted))
	a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
	a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt)

	collectOOORecords()
	if a.head.wbl != nil {
		if err := a.head.wbl.Log(oooRecords...); err != nil {
			// TODO(codesome): Currently WBL logging of ooo samples is best effort here since we cannot try logging
			// until we have found what samples become OOO. We can try having a metric for this failure.
			// Returning the error here is not correct because we have already put the samples into the memory,
			// hence the append/insert was a success.
			level.Error(a.head.logger).Log("msg", "Failed to log out of order samples into the WAL", "err", err)
		}
	}

	gcm_exportsetup.Global().Export(a.metadataFunc, a.samples, exportExemplars)

	return nil
}