in v1storage/storage.go [1462:1570]
func (s *MemorySeriesStorage) loop() {
checkpointTimer := time.NewTimer(s.checkpointInterval)
checkpointMinTimer := time.NewTimer(0)
var dirtySeriesCount int64
defer func() {
checkpointTimer.Stop()
checkpointMinTimer.Stop()
log.Info("Maintenance loop stopped.")
close(s.loopStopped)
}()
memoryFingerprints := s.cycleThroughMemoryFingerprints()
archivedFingerprints := s.cycleThroughArchivedFingerprints()
checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
checkpointNow := make(chan struct{}, 1)
doCheckpoint := func() time.Duration {
start := time.Now()
// We clear this before the checkpoint so that dirtySeriesCount
// is an upper bound.
atomic.StoreInt64(&dirtySeriesCount, 0)
s.dirtySeries.Set(0)
select {
case <-checkpointNow:
// Signal cleared.
default:
// No signal pending.
}
err := s.persistence.checkpointSeriesMapAndHeads(
checkpointCtx, s.fpToSeries, s.fpLocker,
)
if err == context.Canceled {
log.Info("Checkpoint canceled.")
} else if err != nil {
s.persistErrors.Inc()
log.Errorln("Error while checkpointing:", err)
}
return time.Since(start)
}
// Checkpoints can happen concurrently with maintenance so even with heavy
// checkpointing there will still be sufficient progress on maintenance.
checkpointLoopStopped := make(chan struct{})
go func() {
for {
select {
case <-checkpointCtx.Done():
checkpointLoopStopped <- struct{}{}
return
case <-checkpointMinTimer.C:
var took time.Duration
select {
case <-checkpointCtx.Done():
checkpointLoopStopped <- struct{}{}
return
case <-checkpointTimer.C:
took = doCheckpoint()
case <-checkpointNow:
if !checkpointTimer.Stop() {
<-checkpointTimer.C
}
took = doCheckpoint()
}
checkpointMinTimer.Reset(took)
checkpointTimer.Reset(s.checkpointInterval)
}
}
}()
loop:
for {
select {
case <-s.loopStopping:
checkpointCancel()
break loop
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
dirty := atomic.AddInt64(&dirtySeriesCount, 1)
s.dirtySeries.Set(float64(dirty))
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
// would be counterproductive, as it would slow down chunk persisting even more,
// while in a situation like that, where we are clearly lacking speed of disk
// maintenance, the best we can do for crash recovery is to persist chunks as
// quickly as possible. So only checkpoint if we are not in rushed mode.
if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
dirty >= int64(s.checkpointDirtySeriesLimit) {
select {
case checkpointNow <- struct{}{}:
// Signal sent.
default:
// Signal already pending.
}
}
}
case fp := <-archivedFingerprints:
s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter))
}
}
// Wait until both channels are closed.
for range memoryFingerprints {
}
for range archivedFingerprints {
}
<-checkpointLoopStopped
}