in v1storage/storage.go [1369:1418]
func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint {
memoryFingerprints := make(chan model.Fingerprint)
go func() {
defer close(memoryFingerprints)
firstPass := true
for {
// Initial wait, also important if there are no FPs yet.
if !s.waitForNextFP(s.fpToSeries.length(), 1) {
return
}
begin := time.Now()
fps := s.fpToSeries.sortedFPs()
if firstPass && len(fps) > 0 {
// Start first pass at a random location in the
// key space to cover the whole key space even
// in the case of frequent restarts.
fps = fps[rand.Intn(len(fps)):]
}
count := 0
for _, fp := range fps {
select {
case memoryFingerprints <- fp:
case <-s.loopStopping:
return
}
// Reduce the wait time according to the urgency score.
score, rushed := s.getPersistenceUrgencyScore()
if rushed {
score = 1
}
s.waitForNextFP(s.fpToSeries.length(), 1-score)
count++
}
if count > 0 {
msg := "full"
if firstPass {
msg = "initial partial"
}
log.Infof(
"Completed %s maintenance sweep through %d in-memory fingerprints in %v.",
msg, count, time.Since(begin),
)
}
firstPass = false
}
}()
return memoryFingerprints
}