func()

in v1storage/storage.go [918:975]


func (s *MemorySeriesStorage) Append(sample *model.Sample) error {
	for ln, lv := range sample.Metric {
		if len(lv) == 0 {
			delete(sample.Metric, ln)
		}
	}
	rawFP := sample.Metric.FastFingerprint()
	s.fpLocker.Lock(rawFP)
	fp := s.mapper.mapFP(rawFP, sample.Metric)
	defer func() {
		s.fpLocker.Unlock(fp)
	}() // Func wrapper because fp might change below.
	if fp != rawFP {
		// Switch locks.
		s.fpLocker.Unlock(rawFP)
		s.fpLocker.Lock(fp)
	}
	series, err := s.getOrCreateSeries(fp, sample.Metric)
	if err != nil {
		return err // getOrCreateSeries took care of quarantining already.
	}

	if sample.Timestamp == series.lastTime {
		// Don't report "no-op appends", i.e. where timestamp and sample
		// value are the same as for the last append, as they are a
		// common occurrence when using client-side timestamps
		// (e.g. Pushgateway or federation).
		if sample.Timestamp == series.lastTime &&
			series.lastSampleValueSet &&
			sample.Value.Equal(series.lastSampleValue) {
			return nil
		}
		s.discardedSamples.WithLabelValues(duplicateSample).Inc()
		return ErrDuplicateSampleForTimestamp // Caused by the caller.
	}
	if sample.Timestamp < series.lastTime {
		s.discardedSamples.WithLabelValues(outOfOrderTimestamp).Inc()
		return ErrOutOfOrderSample // Caused by the caller.
	}
	headChunkWasClosed := series.headChunkClosed
	completedChunksCount, err := series.add(model.SamplePair{
		Value:     sample.Value,
		Timestamp: sample.Timestamp,
	})
	if err != nil {
		s.quarantineSeries(fp, sample.Metric, err)
		return err
	}
	if headChunkWasClosed {
		// Appending to a series with a closed head chunk creates an
		// additional open head chunk.
		s.headChunks.Inc()
	}
	s.ingestedSamples.Inc()
	s.incNumChunksToPersist(completedChunksCount)

	return nil
}