func()

in v1storage/crashrecovery.go [38:155]


func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint]*memorySeries) error {
	// TODO(beorn): We need proper tests for the crash recovery.
	log.Warn("Starting crash recovery. Prometheus is inoperational until complete.")
	log.Warn("To avoid crash recovery in the future, shut down Prometheus with SIGTERM or a HTTP POST to /-/quit.")

	fpsSeen := map[model.Fingerprint]struct{}{}
	count := 0
	seriesDirNameFmt := fmt.Sprintf("%%0%dx", seriesDirNameLen)

	// Delete the fingerprint mapping file as it might be stale or
	// corrupt. We'll rebuild the mappings as we go.
	if err := os.RemoveAll(p.mappingsFileName()); err != nil {
		return fmt.Errorf("couldn't remove old fingerprint mapping file %s: %s", p.mappingsFileName(), err)
	}
	// The mappings to rebuild.
	fpm := fpMappings{}

	log.Info("Scanning files.")
	for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
		dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
		dir, err := os.Open(dirname)
		if os.IsNotExist(err) {
			continue
		}
		if err != nil {
			return err
		}
		for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) {
			if err != nil {
				dir.Close()
				return err
			}
			for _, fi := range fis {
				fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries, fpm)
				if ok {
					fpsSeen[fp] = struct{}{}
				}
				count++
				if count%10000 == 0 {
					log.Infof("%d files scanned.", count)
				}
			}
		}
		dir.Close()
	}
	log.Infof("File scan complete. %d series found.", len(fpsSeen))

	log.Info("Checking for series without series file.")
	for fp, s := range fingerprintToSeries {
		if _, seen := fpsSeen[fp]; !seen {
			// fp exists in fingerprintToSeries, but has no representation on disk.
			if s.persistWatermark >= len(s.chunkDescs) {
				// Oops, everything including the head chunk was
				// already persisted, but nothing on disk. Or
				// the persistWatermark is plainly wrong. Thus,
				// we lost that series completely. Clean up the
				// remnants.
				delete(fingerprintToSeries, fp)
				if err := p.purgeArchivedMetric(fp); err != nil {
					// Purging the archived metric didn't work, so try
					// to unindex it, just in case it's in the indexes.
					p.unindexMetric(fp, s.metric)
				}
				log.Warnf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric)
				continue
			}
			// If we are here, the only chunks we have are the chunks in the checkpoint.
			// Adjust things accordingly.
			if s.persistWatermark > 0 || s.chunkDescsOffset != 0 {
				minLostChunks := s.persistWatermark + s.chunkDescsOffset
				if minLostChunks <= 0 {
					log.Warnf(
						"Possible loss of chunks for fingerprint %v, metric %v.",
						fp, s.metric,
					)
				} else {
					log.Warnf(
						"Lost at least %d chunks for fingerprint %v, metric %v.",
						minLostChunks, fp, s.metric,
					)
				}
				s.chunkDescs = append(
					make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark),
					s.chunkDescs[s.persistWatermark:]...,
				)
				chunk.NumMemDescs.Sub(float64(s.persistWatermark))
				s.persistWatermark = 0
				s.chunkDescsOffset = 0
			}
			maybeAddMapping(fp, s.metric, fpm)
			fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
		}
	}
	log.Info("Check for series without series file complete.")

	if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen, fpm); err != nil {
		return err
	}
	if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil {
		return err
	}
	// Finally rewrite the mappings file if there are any mappings.
	if len(fpm) > 0 {
		if err := p.checkpointFPMappings(fpm); err != nil {
			return err
		}
	}

	p.dirtyMtx.Lock()
	// Only declare storage clean if it didn't become dirty during crash recovery.
	if !p.becameDirty {
		p.dirty = false
	}
	p.dirtyMtx.Unlock()

	log.Warn("Crash recovery complete.")
	return nil
}