in v1storage/storage.go [1603:1670]
func (s *MemorySeriesStorage) maintainMemorySeries(
fp model.Fingerprint, beforeTime model.Time,
) (becameDirty bool) {
defer func(begin time.Time) {
s.maintainSeriesDuration.WithLabelValues(maintainInMemory).Observe(
time.Since(begin).Seconds(),
)
}(time.Now())
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
// Series is actually not in memory, perhaps archived or dropped in the meantime.
return false
}
defer s.seriesOps.WithLabelValues(memoryMaintenance).Inc()
closed, err := series.maybeCloseHeadChunk(s.headChunkTimeout)
if err != nil {
s.quarantineSeries(fp, series.metric, err)
s.persistErrors.Inc()
}
if closed {
s.incNumChunksToPersist(1)
s.headChunks.Dec()
}
seriesWasDirty := series.dirty
if s.writeMemorySeries(fp, series, beforeTime) {
// Series is gone now, we are done.
return false
}
iOldestNotEvicted := -1
for i, cd := range series.chunkDescs {
if !cd.IsEvicted() {
iOldestNotEvicted = i
break
}
}
// Archive if all chunks are evicted. Also make sure the last sample has
// an age of at least headChunkTimeout (which is very likely anyway).
if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > s.headChunkTimeout {
s.fpToSeries.del(fp)
s.memorySeries.Dec()
s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime)
s.seriesOps.WithLabelValues(archive).Inc()
oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark))
if oldWatermark < int64(series.lastTime) {
if !atomic.CompareAndSwapInt64(
(*int64)(&s.archiveHighWatermark),
oldWatermark, int64(series.lastTime),
) {
panic("s.archiveHighWatermark modified outside of maintainMemorySeries")
}
}
return
}
// If we are here, the series is not archived, so check for chunk.Desc
// eviction next.
series.evictChunkDescs(iOldestNotEvicted)
return series.dirty && !seriesWasDirty
}