in v1storage/crashrecovery.go [38:155]
func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint]*memorySeries) error {
// TODO(beorn): We need proper tests for the crash recovery.
log.Warn("Starting crash recovery. Prometheus is inoperational until complete.")
log.Warn("To avoid crash recovery in the future, shut down Prometheus with SIGTERM or a HTTP POST to /-/quit.")
fpsSeen := map[model.Fingerprint]struct{}{}
count := 0
seriesDirNameFmt := fmt.Sprintf("%%0%dx", seriesDirNameLen)
// Delete the fingerprint mapping file as it might be stale or
// corrupt. We'll rebuild the mappings as we go.
if err := os.RemoveAll(p.mappingsFileName()); err != nil {
return fmt.Errorf("couldn't remove old fingerprint mapping file %s: %s", p.mappingsFileName(), err)
}
// The mappings to rebuild.
fpm := fpMappings{}
log.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := filepath.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
dir, err := os.Open(dirname)
if os.IsNotExist(err) {
continue
}
if err != nil {
return err
}
for fis := []os.FileInfo{}; err != io.EOF; fis, err = dir.Readdir(1024) {
if err != nil {
dir.Close()
return err
}
for _, fi := range fis {
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries, fpm)
if ok {
fpsSeen[fp] = struct{}{}
}
count++
if count%10000 == 0 {
log.Infof("%d files scanned.", count)
}
}
}
dir.Close()
}
log.Infof("File scan complete. %d series found.", len(fpsSeen))
log.Info("Checking for series without series file.")
for fp, s := range fingerprintToSeries {
if _, seen := fpsSeen[fp]; !seen {
// fp exists in fingerprintToSeries, but has no representation on disk.
if s.persistWatermark >= len(s.chunkDescs) {
// Oops, everything including the head chunk was
// already persisted, but nothing on disk. Or
// the persistWatermark is plainly wrong. Thus,
// we lost that series completely. Clean up the
// remnants.
delete(fingerprintToSeries, fp)
if err := p.purgeArchivedMetric(fp); err != nil {
// Purging the archived metric didn't work, so try
// to unindex it, just in case it's in the indexes.
p.unindexMetric(fp, s.metric)
}
log.Warnf("Lost series detected: fingerprint %v, metric %v.", fp, s.metric)
continue
}
// If we are here, the only chunks we have are the chunks in the checkpoint.
// Adjust things accordingly.
if s.persistWatermark > 0 || s.chunkDescsOffset != 0 {
minLostChunks := s.persistWatermark + s.chunkDescsOffset
if minLostChunks <= 0 {
log.Warnf(
"Possible loss of chunks for fingerprint %v, metric %v.",
fp, s.metric,
)
} else {
log.Warnf(
"Lost at least %d chunks for fingerprint %v, metric %v.",
minLostChunks, fp, s.metric,
)
}
s.chunkDescs = append(
make([]*chunk.Desc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]...,
)
chunk.NumMemDescs.Sub(float64(s.persistWatermark))
s.persistWatermark = 0
s.chunkDescsOffset = 0
}
maybeAddMapping(fp, s.metric, fpm)
fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
}
}
log.Info("Check for series without series file complete.")
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen, fpm); err != nil {
return err
}
if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil {
return err
}
// Finally rewrite the mappings file if there are any mappings.
if len(fpm) > 0 {
if err := p.checkpointFPMappings(fpm); err != nil {
return err
}
}
p.dirtyMtx.Lock()
// Only declare storage clean if it didn't become dirty during crash recovery.
if !p.becameDirty {
p.dirty = false
}
p.dirtyMtx.Unlock()
log.Warn("Crash recovery complete.")
return nil
}