func()

in v1storage/storage.go [1462:1570]


func (s *MemorySeriesStorage) loop() {
	checkpointTimer := time.NewTimer(s.checkpointInterval)
	checkpointMinTimer := time.NewTimer(0)

	var dirtySeriesCount int64

	defer func() {
		checkpointTimer.Stop()
		checkpointMinTimer.Stop()
		log.Info("Maintenance loop stopped.")
		close(s.loopStopped)
	}()

	memoryFingerprints := s.cycleThroughMemoryFingerprints()
	archivedFingerprints := s.cycleThroughArchivedFingerprints()

	checkpointCtx, checkpointCancel := context.WithCancel(context.Background())
	checkpointNow := make(chan struct{}, 1)

	doCheckpoint := func() time.Duration {
		start := time.Now()
		// We clear this before the checkpoint so that dirtySeriesCount
		// is an upper bound.
		atomic.StoreInt64(&dirtySeriesCount, 0)
		s.dirtySeries.Set(0)
		select {
		case <-checkpointNow:
			// Signal cleared.
		default:
			// No signal pending.
		}
		err := s.persistence.checkpointSeriesMapAndHeads(
			checkpointCtx, s.fpToSeries, s.fpLocker,
		)
		if err == context.Canceled {
			log.Info("Checkpoint canceled.")
		} else if err != nil {
			s.persistErrors.Inc()
			log.Errorln("Error while checkpointing:", err)
		}
		return time.Since(start)
	}

	// Checkpoints can happen concurrently with maintenance so even with heavy
	// checkpointing there will still be sufficient progress on maintenance.
	checkpointLoopStopped := make(chan struct{})
	go func() {
		for {
			select {
			case <-checkpointCtx.Done():
				checkpointLoopStopped <- struct{}{}
				return
			case <-checkpointMinTimer.C:
				var took time.Duration
				select {
				case <-checkpointCtx.Done():
					checkpointLoopStopped <- struct{}{}
					return
				case <-checkpointTimer.C:
					took = doCheckpoint()
				case <-checkpointNow:
					if !checkpointTimer.Stop() {
						<-checkpointTimer.C
					}
					took = doCheckpoint()
				}
				checkpointMinTimer.Reset(took)
				checkpointTimer.Reset(s.checkpointInterval)
			}
		}
	}()

loop:
	for {
		select {
		case <-s.loopStopping:
			checkpointCancel()
			break loop
		case fp := <-memoryFingerprints:
			if s.maintainMemorySeries(fp, model.Now().Add(-s.dropAfter)) {
				dirty := atomic.AddInt64(&dirtySeriesCount, 1)
				s.dirtySeries.Set(float64(dirty))
				// Check if we have enough "dirty" series so that we need an early checkpoint.
				// However, if we are already behind persisting chunks, creating a checkpoint
				// would be counterproductive, as it would slow down chunk persisting even more,
				// while in a situation like that, where we are clearly lacking speed of disk
				// maintenance, the best we can do for crash recovery is to persist chunks as
				// quickly as possible. So only checkpoint if we are not in rushed mode.
				if _, rushed := s.getPersistenceUrgencyScore(); !rushed &&
					dirty >= int64(s.checkpointDirtySeriesLimit) {
					select {
					case checkpointNow <- struct{}{}:
						// Signal sent.
					default:
						// Signal already pending.
					}
				}
			}
		case fp := <-archivedFingerprints:
			s.maintainArchivedSeries(fp, model.Now().Add(-s.dropAfter))
		}
	}
	// Wait until both channels are closed.
	for range memoryFingerprints {
	}
	for range archivedFingerprints {
	}
	<-checkpointLoopStopped
}